package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolverGroup;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.Future;
import io.vertx.core.*;
import io.vertx.core.datagram.DatagramSocket;
import io.vertx.core.datagram.DatagramSocketOptions;
import io.vertx.core.datagram.impl.DatagramSocketImpl;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.dns.DnsClient;
import io.vertx.core.dns.DnsClientOptions;
import io.vertx.core.dns.impl.DnsClientImpl;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.EventBusInternal;
import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.impl.FileResolver;
import io.vertx.core.file.impl.FileSystemImpl;
import io.vertx.core.file.impl.WindowsFileSystem;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.impl.HttpClientImpl;
import io.vertx.core.http.impl.HttpServerImpl;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.impl.resolver.DnsResolverProvider;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.impl.NetClientImpl;
import io.vertx.core.net.impl.NetServerImpl;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.net.impl.TCPServerBase;
import io.vertx.core.net.impl.transport.Transport;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.shareddata.impl.SharedDataImpl;
import io.vertx.core.spi.VerticleFactory;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeSelector;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
import java.io.File;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
public class VertxImpl implements VertxInternal, MetricsProvider {
private static final Logger log = LoggerFactory.getLogger(VertxImpl.class);
private static final String CLUSTER_MAP_NAME = "__vertx.haInfo";
private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio";
private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50);
static {
if (System.getProperty("io.netty.leakDetection.level") != null ||
System.getProperty("io.netty.leakDetectionLevel") != null) {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED);
}
System.setProperty("io.netty.noJdkZlibDecoder", "false");
}
private final FileSystem fileSystem = getFileSystem();
private final SharedData sharedData;
private final VertxMetrics metrics;
private final ConcurrentMap<Long, InternalTimerHandler> timeouts = new ConcurrentHashMap<>();
private final AtomicLong timeoutCounter = new AtomicLong(0);
private final ClusterManager clusterManager;
private final NodeSelector nodeSelector;
private final DeploymentManager deploymentManager;
private final VerticleManager verticleManager;
private final FileResolver fileResolver;
private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
final WorkerPool workerPool;
final WorkerPool internalBlockingPool;
private final ThreadFactory eventLoopThreadFactory;
private final EventLoopGroup eventLoopGroup;
private final EventLoopGroup acceptorEventLoopGroup;
private final BlockedThreadChecker checker;
private final AddressResolver addressResolver;
private final AddressResolverOptions addressResolverOptions;
private final EventBusInternal eventBus;
private volatile HAManager haManager;
private boolean closed;
private volatile Handler<Throwable> exceptionHandler;
private final Map<String, SharedWorkerPool> namedWorkerPools;
private final int defaultWorkerPoolSize;
private final long maxWorkerExecTime;
private final TimeUnit maxWorkerExecTimeUnit;
private final long maxEventLoopExecTime;
private final TimeUnit maxEventLoopExecTimeUnit;
private final CloseHooks closeHooks;
private final Transport transport;
private final VertxTracer tracer;
private final ThreadLocal<WeakReference<AbstractContext>> stickyContext = new ThreadLocal<>();
VertxImpl(VertxOptions options, ClusterManager clusterManager, NodeSelector nodeSelector, VertxMetrics metrics, VertxTracer<?, ?> tracer, Transport transport, FileResolver fileResolver) {
if (Vertx.currentContext() != null) {
log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?");
}
closeHooks = new CloseHooks(log);
maxEventLoopExecTime = options.getMaxEventLoopExecuteTime();
maxEventLoopExecTimeUnit = options.getMaxEventLoopExecuteTimeUnit();
checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit());
eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, maxEventLoopExecTime, maxEventLoopExecTimeUnit);
eventLoopGroup = transport.eventLoopGroup(Transport.IO_EVENT_LOOP_GROUP, options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO);
ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit());
acceptorEventLoopGroup = transport.eventLoopGroup(Transport.ACCEPTOR_EVENT_LOOP_GROUP, 1, acceptorEventLoopThreadFactory, 100);
int workerPoolSize = options.getWorkerPoolSize();
ExecutorService workerExec = new ThreadPoolExecutor(workerPoolSize, workerPoolSize,
0L, TimeUnit.MILLISECONDS, new LinkedTransferQueue<>(),
new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));
PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null;
ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(),
new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit()));
PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null;
internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics);
namedWorkerPools = new HashMap<>();
workerPool = new WorkerPool(workerExec, workerPoolMetrics);
defaultWorkerPoolSize = options.getWorkerPoolSize();
maxWorkerExecTime = options.getMaxWorkerExecuteTime();
maxWorkerExecTimeUnit = options.getMaxWorkerExecuteTimeUnit();
this.metrics = metrics;
this.transport = transport;
this.fileResolver = fileResolver;
this.addressResolverOptions = options.getAddressResolverOptions();
this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions());
this.tracer = tracer;
this.clusterManager = clusterManager;
this.nodeSelector = nodeSelector;
this.eventBus = clusterManager != null ? new ClusteredEventBus(this, options, clusterManager, nodeSelector) : new EventBusImpl(this);
this.sharedData = new SharedDataImpl(this, clusterManager);
this.deploymentManager = new DeploymentManager(this);
this.verticleManager = new VerticleManager(this, deploymentManager);
}
void init() {
eventBus.start(Promise.promise());
if (metrics != null) {
metrics.vertxCreated(this);
}
}
void initClustered(VertxOptions options, Handler<AsyncResult<Vertx>> resultHandler) {
nodeSelector.init(this, clusterManager);
clusterManager.init(this, nodeSelector);
Promise<Void> initPromise = getOrCreateContext().promise();
initPromise.future().onComplete(ar -> {
if (ar.succeeded()) {
if (metrics != null) {
metrics.vertxCreated(this);
}
resultHandler.handle(Future.succeededFuture(this));
} else {
log.error("Failed to initialize clustered Vert.x", ar.cause());
close().onComplete(ignore -> resultHandler.handle(Future.failedFuture(ar.cause())));
}
});
Promise<Void> joinPromise = Promise.promise();
joinPromise.future().onComplete(ar -> {
if (ar.succeeded()) {
createHaManager(options, initPromise);
} else {
initPromise.fail(ar.cause());
}
});
clusterManager.join(joinPromise);
}
private void createHaManager(VertxOptions options, Promise<Void> initPromise) {
this.<HAManager>executeBlocking(fut -> {
Map<String, String> syncMap = clusterManager.getSyncMap(CLUSTER_MAP_NAME);
HAManager haManager = new HAManager(this, deploymentManager, verticleManager, clusterManager, syncMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled());
fut.complete(haManager);
}, false, ar -> {
if (ar.succeeded()) {
haManager = ar.result();
startEventBus(initPromise);
} else {
initPromise.fail(ar.cause());
}
});
}
private void startEventBus(Promise<Void> initPromise) {
Promise<Void> promise = Promise.promise();
eventBus.start(promise);
promise.future().onComplete(ar -> {
if (ar.succeeded()) {
initializeHaManager(initPromise);
} else {
initPromise.fail(ar.cause());
}
});
}
private void initializeHaManager(Promise<Void> initPromise) {
this.executeBlocking(fut -> {
haManager.init();
fut.complete();
}, false, initPromise);
}
protected FileSystem getFileSystem() {
return Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this);
}
@Override
public long maxEventLoopExecTime() {
return maxEventLoopExecTime;
}
@Override
public TimeUnit maxEventLoopExecTimeUnit() {
return maxEventLoopExecTimeUnit;
}
@Override
public DatagramSocket createDatagramSocket(DatagramSocketOptions options) {
return DatagramSocketImpl.create(this, options);
}
@Override
public DatagramSocket createDatagramSocket() {
return createDatagramSocket(new DatagramSocketOptions());
}
public NetServer createNetServer(NetServerOptions options) {
return new NetServerImpl(this, options);
}
@Override
public NetServer createNetServer() {
return createNetServer(new NetServerOptions());
}
@Override
public NetClient createNetClient(NetClientOptions options, CloseFuture closeFuture) {
NetClientImpl client = new NetClientImpl(this, options, closeFuture);
closeFuture.init(client);
return client;
}
public NetClient createNetClient(NetClientOptions options) {
CloseFuture closeFuture = new CloseFuture();
NetClient client = createNetClient(options, closeFuture);
CloseHooks hooks = resolveHooks();
hooks.add(closeFuture);
return client;
}
@Override
public NetClient createNetClient() {
return createNetClient(new NetClientOptions());
}
@Override
public Transport transport() {
return transport;
}
@Override
public boolean isNativeTransportEnabled() {
return transport != Transport.JDK;
}
public FileSystem fileSystem() {
return fileSystem;
}
public SharedData sharedData() {
return sharedData;
}
public HttpServer createHttpServer(HttpServerOptions serverOptions) {
return new HttpServerImpl(this, serverOptions);
}
@Override
public HttpServer createHttpServer() {
return createHttpServer(new HttpServerOptions());
}
@Override
public HttpClient createHttpClient(HttpClientOptions options, CloseFuture closeFuture) {
HttpClientImpl client = new HttpClientImpl(this, options, closeFuture);
closeFuture.init(client);
return client;
}
public HttpClient createHttpClient(HttpClientOptions options) {
CloseFuture closeFuture = new CloseFuture();
HttpClient client = createHttpClient(options, closeFuture);
CloseHooks hooks = resolveHooks();
hooks.add(closeFuture);
return client;
}
@Override
public HttpClient createHttpClient() {
return createHttpClient(new HttpClientOptions());
}
public EventBus eventBus() {
return eventBus;
}
public long setPeriodic(long delay, Handler<Long> handler) {
return scheduleTimeout(getOrCreateContext(), handler, delay, true);
}
@Override
public TimeoutStream periodicStream(long delay) {
return new TimeoutStreamImpl(delay, true);
}
public long setTimer(long delay, Handler<Long> handler) {
return scheduleTimeout(getOrCreateContext(), handler, delay, false);
}
@Override
public TimeoutStream timerStream(long delay) {
return new TimeoutStreamImpl(delay, false);
}
@Override
public <T> PromiseInternal<T> promise() {
ContextInternal context = getOrCreateContext();
return context.promise();
}
@Override
public <T> PromiseInternal<T> promise(Handler<AsyncResult<T>> handler) {
if (handler instanceof PromiseInternal) {
return (PromiseInternal<T>) handler;
} else {
PromiseInternal<T> promise = promise();
promise.future().onComplete(handler);
return promise;
}
}
public void runOnContext(Handler<Void> task) {
ContextInternal context = getOrCreateContext();
context.runOnContext(task);
}
public ExecutorService getWorkerPool() {
return workerPool.executor();
}
public EventLoopGroup getEventLoopGroup() {
return eventLoopGroup;
}
public EventLoopGroup getAcceptorEventLoopGroup() {
return acceptorEventLoopGroup;
}
public ContextInternal getOrCreateContext() {
AbstractContext ctx = getContext();
if (ctx == null) {
ctx = createEventLoopContext();
stickyContext.set(new WeakReference<>(ctx));
}
return ctx;
}
public Map<ServerID, HttpServerImpl> sharedHttpServers() {
return sharedHttpServers;
}
public Map<ServerID, NetServerImpl> sharedNetServers() {
return sharedNetServers;
}
@Override
public <S extends TCPServerBase> Map<ServerID, S> sharedTCPServers(Class<S> type) {
if (type == NetServerImpl.class) {
return (Map<ServerID, S>) sharedNetServers;
} else if (type == HttpServerImpl.class) {
return (Map<ServerID, S>) sharedHttpServers;
} else {
throw new IllegalStateException();
}
}
@Override
public boolean isMetricsEnabled() {
return metrics != null;
}
@Override
public Metrics getMetrics() {
return metrics;
}
public boolean cancelTimer(long id) {
InternalTimerHandler handler = timeouts.remove(id);
if (handler != null) {
handler.cancel();
return true;
} else {
return false;
}
}
@Override
public EventLoopContext createEventLoopContext(Deployment deployment, CloseHooks closeHooks, WorkerPool workerPool, ClassLoader tccl) {
return new EventLoopContext(this, tracer, eventLoopGroup.next(), internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deployment, closeHooks, tccl);
}
@Override
public EventLoopContext createEventLoopContext(EventLoop eventLoop, WorkerPool workerPool, ClassLoader tccl) {
return new EventLoopContext(this, tracer, eventLoop, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, null, null, tccl);
}
@Override
public EventLoopContext createEventLoopContext() {
return createEventLoopContext(null, null, null, Thread.currentThread().getContextClassLoader());
}
@Override
public WorkerContext createWorkerContext(Deployment deployment, CloseHooks closeHooks, WorkerPool workerPool, ClassLoader tccl) {
return new WorkerContext(this, tracer, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deployment, closeHooks, tccl);
}
@Override
public WorkerContext createWorkerContext() {
return createWorkerContext(null, null, null, null);
}
@Override
public DnsClient createDnsClient(int port, String host) {
return createDnsClient(new DnsClientOptions().setHost(host).setPort(port));
}
@Override
public DnsClient createDnsClient() {
return createDnsClient(new DnsClientOptions());
}
@Override
public DnsClient createDnsClient(DnsClientOptions options) {
String host = options.getHost();
int port = options.getPort();
if (host == null || port < 0) {
DnsResolverProvider provider = new DnsResolverProvider(this, addressResolverOptions);
InetSocketAddress address = provider.nameServerAddresses().get(0);
options = new DnsClientOptions(options)
.setHost(address.getAddress().getHostAddress())
.setPort(address.getPort());
}
return new DnsClientImpl(this, options);
}
public long scheduleTimeout(ContextInternal context, Handler<Long> handler, long delay, boolean periodic) {
if (delay < 1) {
throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms");
}
long timerId = timeoutCounter.getAndIncrement();
InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context);
timeouts.put(timerId, task);
if (context.isDeployment()) {
context.addCloseHook(task);
}
return timerId;
}
public AbstractContext getContext() {
AbstractContext context = (AbstractContext) ContextInternal.current();
if (context != null && context.owner() == this) {
return context;
} else {
WeakReference<AbstractContext> ref = stickyContext.get();
return ref != null ? ref.get() : null;
}
}
public ClusterManager getClusterManager() {
return clusterManager;
}
@Override
public Future<Void> close() {
Promise<Void> promise = Promise.promise();
close(promise);
return promise.future();
}
private void closeClusterManager(Handler<AsyncResult<Void>> completionHandler) {
Promise<Void> leavePromise = getOrCreateContext().promise();
if (clusterManager != null) {
clusterManager.leave(leavePromise);
} else {
leavePromise.complete();
}
leavePromise.future().onComplete(ar -> {
if (ar.failed()) {
log.error("Failed to leave cluster", ar.cause());
}
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
});
}
@Override
public synchronized void close(Handler<AsyncResult<Void>> completionHandler) {
if (closed || eventBus == null) {
if (completionHandler != null) {
completionHandler.handle(Future.succeededFuture());
}
return;
}
closed = true;
closeHooks.run(ar -> {
deploymentManager.undeployAll().onComplete(ar1 -> {
HAManager haManager = haManager();
Promise<Void> haPromise = Promise.promise();
if (haManager != null) {
this.executeBlocking(fut -> {
haManager.stop();
fut.complete();
}, false, haPromise);
} else {
haPromise.complete();
}
haPromise.future().onComplete(ar2 -> {
addressResolver.close(ar3 -> {
Promise<Void> ebClose = getOrCreateContext().promise();
eventBus.close(ebClose);
ebClose.future().onComplete(ar4 -> {
closeClusterManager(ar5 -> {
deleteCacheDirAndShutdown(completionHandler);
});
});
});
});
});
});
}
@Override
public Future<String> deployVerticle(String name) {
return deployVerticle(name, new DeploymentOptions());
}
@Override
public void deployVerticle(String name, Handler<AsyncResult<String>> completionHandler) {
deployVerticle(name, new DeploymentOptions(), completionHandler);
}
@Override
public Future<String> deployVerticle(String name, DeploymentOptions options) {
if (options.isHa() && haManager() != null && haManager().isEnabled()) {
Promise<String> promise = getOrCreateContext().promise();
haManager().deployVerticle(name, options, promise);
return promise.future();
} else {
return verticleManager.deployVerticle(name, options).map(Deployment::deploymentID);
}
}
@Override
public void deployVerticle(String name, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
Future<String> fut = deployVerticle(name, options);
if (completionHandler != null) {
fut.onComplete(completionHandler);
}
}
@Override
public Future<String> deployVerticle(Verticle verticle) {
return deployVerticle(verticle, new DeploymentOptions());
}
@Override
public void deployVerticle(Verticle verticle, Handler<AsyncResult<String>> completionHandler) {
Future<String> fut = deployVerticle(verticle);
if (completionHandler != null) {
fut.onComplete(completionHandler);
}
}
@Override
public Future<String> deployVerticle(Verticle verticle, DeploymentOptions options) {
if (options.getInstances() != 1) {
throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle");
}
return deployVerticle((Callable<Verticle>) () -> verticle, options);
}
@Override
public void deployVerticle(Verticle verticle, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
Future<String> fut = deployVerticle(verticle, options);
if (completionHandler != null) {
fut.onComplete(completionHandler);
}
}
@Override
public Future<String> deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options) {
return deployVerticle((Callable<Verticle>) verticleClass::newInstance, options);
}
@Override
public void deployVerticle(Class<? extends Verticle> verticleClass, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
Future<String> fut = deployVerticle(verticleClass, options);
if (completionHandler != null) {
fut.onComplete(completionHandler);
}
}
@Override
public Future<String> deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options) {
return deployVerticle((Callable<Verticle>) verticleSupplier::get, options);
}
@Override
public void deployVerticle(Supplier<Verticle> verticleSupplier, DeploymentOptions options, Handler<AsyncResult<String>> completionHandler) {
Future<String> fut = deployVerticle(verticleSupplier, options);
if (completionHandler != null) {
fut.onComplete(completionHandler);
}
}
private Future<String> deployVerticle(Callable<Verticle> verticleSupplier, DeploymentOptions options) {
boolean closed;
synchronized (this) {
closed = this.closed;
}
if (closed) {
return Future.failedFuture("Vert.x closed");
} else {
return deploymentManager.deployVerticle(verticleSupplier, options);
}
}
@Override
public Future<Void> undeploy(String deploymentID) {
Future<Void> future;
HAManager haManager = haManager();
if (haManager != null && haManager.isEnabled()) {
future = this.executeBlocking(fut -> {
haManager.removeFromHA(deploymentID);
fut.complete();
}, false);
} else {
future = getOrCreateContext().succeededFuture();
}
return future.compose(v -> deploymentManager.undeployVerticle(deploymentID));
}
@Override
public void undeploy(String deploymentID, Handler<AsyncResult<Void>> completionHandler) {
Future<Void> fut = undeploy(deploymentID);
if (completionHandler != null) {
fut.onComplete(completionHandler);
}
}
@Override
public Set<String> deploymentIDs() {
return deploymentManager.deployments();
}
@Override
public void registerVerticleFactory(VerticleFactory factory) {
verticleManager.registerVerticleFactory(factory);
}
@Override
public void unregisterVerticleFactory(VerticleFactory factory) {
verticleManager.unregisterVerticleFactory(factory);
}
@Override
public Set<VerticleFactory> verticleFactories() {
return verticleManager.verticleFactories();
}
@Override
public <T> void executeBlockingInternal(Handler<Promise<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
ContextInternal context = getOrCreateContext();
context.executeBlockingInternal(blockingCodeHandler, resultHandler);
}
@Override
public <T> void executeBlockingInternal(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
ContextInternal context = getOrCreateContext();
context.executeBlockingInternal(blockingCodeHandler, ordered, resultHandler);
}
@Override
public <T> Future<@Nullable T> executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered) {
ContextInternal context = getOrCreateContext();
return context.executeBlocking(blockingCodeHandler, ordered);
}
@Override
public <T> Future<T> executeBlocking(Handler<Promise<T>> blockingCodeHandler) {
return executeBlocking(blockingCodeHandler, true);
}
@Override
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered,
Handler<AsyncResult<T>> asyncResultHandler) {
ContextInternal context = getOrCreateContext();
context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler);
}
@Override
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler,
Handler<AsyncResult<T>> asyncResultHandler) {
executeBlocking(blockingCodeHandler, true, asyncResultHandler);
}
@Override
public boolean isClustered() {
return clusterManager != null;
}
@Override
public EventLoopGroup nettyEventLoopGroup() {
return eventLoopGroup;
}
public void simulateKill() {
if (haManager() != null) {
haManager().simulateKill();
}
}
@Override
public Deployment getDeployment(String deploymentID) {
return deploymentManager.getDeployment(deploymentID);
}
@Override
public synchronized void failoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) {
if (haManager() != null) {
haManager().setFailoverCompleteHandler(failoverCompleteHandler);
}
}
@Override
public boolean isKilled() {
return haManager().isKilled();
}
@Override
public void failDuringFailover(boolean fail) {
if (haManager() != null) {
haManager().failDuringFailover(fail);
}
}
@Override
public VertxMetrics metricsSPI() {
return metrics;
}
@Override
public File resolveFile(String fileName) {
return fileResolver.resolveFile(fileName);
}
@Override
public void resolveAddress(String hostname, Handler<AsyncResult<InetAddress>> resultHandler) {
addressResolver.resolveHostname(hostname, resultHandler);
}
@Override
public AddressResolver addressResolver() {
return addressResolver;
}
@Override
public AddressResolverGroup<InetSocketAddress> nettyAddressResolverGroup() {
return addressResolver.nettyAddressResolverGroup();
}
@Override
public BlockedThreadChecker blockedThreadChecker() {
return checker;
}
@SuppressWarnings("unchecked")
private void deleteCacheDirAndShutdown(Handler<AsyncResult<Void>> completionHandler) {
executeBlockingInternal(fut -> {
try {
fileResolver.close();
fut.complete();
} catch (IOException e) {
fut.tryFail(e);
}
}, ar -> {
workerPool.close();
internalBlockingPool.close();
new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close);
acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
@Override
public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
if (!future.isSuccess()) {
log.warn("Failure in shutting down acceptor event loop group", future.cause());
}
eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() {
@Override
public void operationComplete(io.netty.util.concurrent.Future future) throws Exception {
if (!future.isSuccess()) {
log.warn("Failure in shutting down event loop group", future.cause());
}
if (metrics != null) {
metrics.close();
}
if (tracer != null) {
tracer.close();
}
checker.close();
if (completionHandler != null) {
eventLoopThreadFactory.newThread(() -> {
completionHandler.handle(Future.succeededFuture());
}).start();
}
}
});
}
});
});
}
public HAManager haManager() {
return haManager;
}
private class InternalTimerHandler implements Handler<Void>, Closeable, Runnable {
private final Handler<Long> handler;
private final boolean periodic;
private final long timerID;
private final ContextInternal context;
private final java.util.concurrent.Future<?> future;
InternalTimerHandler(long timerID, Handler<Long> runnable, boolean periodic, long delay, ContextInternal context) {
this.context = context;
this.timerID = timerID;
this.handler = runnable;
this.periodic = periodic;
EventLoop el = context.nettyEventLoop();
if (periodic) {
future = el.scheduleAtFixedRate(this, delay, delay, TimeUnit.MILLISECONDS);
} else {
future = el.schedule(this, delay, TimeUnit.MILLISECONDS);
}
}
@Override
public void run() {
context.emit(this);
}
public void handle(Void v) {
if (periodic) {
if (timeouts.containsKey(timerID)) {
handler.handle(timerID);
}
} else if (timeouts.remove(timerID) != null) {
try {
handler.handle(timerID);
} finally {
context.removeCloseHook(this);
}
}
}
private void cancel() {
future.cancel(false);
if (context.isDeployment()) {
context.removeCloseHook(this);
}
}
public void close(Promise<Void> completion) {
if (timeouts.remove(timerID) != null) {
future.cancel(false);
}
completion.complete();
}
}
private class TimeoutStreamImpl implements TimeoutStream, Handler<Long> {
private final long delay;
private final boolean periodic;
private Long id;
private Handler<Long> handler;
private Handler<Void> endHandler;
private long demand;
public TimeoutStreamImpl(long delay, boolean periodic) {
this.delay = delay;
this.periodic = periodic;
this.demand = Long.MAX_VALUE;
}
@Override
public synchronized void handle(Long event) {
try {
if (demand > 0) {
demand--;
handler.handle(event);
}
} finally {
if (!periodic && endHandler != null) {
endHandler.handle(null);
}
}
}
@Override
public synchronized TimeoutStream fetch(long amount) {
demand += amount;
if (demand < 0) {
demand = Long.MAX_VALUE;
}
return this;
}
@Override
public TimeoutStream exceptionHandler(Handler<Throwable> handler) {
return this;
}
@Override
public void cancel() {
if (id != null) {
VertxImpl.this.cancelTimer(id);
}
}
@Override
public synchronized TimeoutStream handler(Handler<Long> handler) {
if (handler != null) {
if (id != null) {
throw new IllegalStateException();
}
this.handler = handler;
id = scheduleTimeout(getOrCreateContext(), this, delay, periodic);
} else {
cancel();
}
return this;
}
@Override
public synchronized TimeoutStream pause() {
demand = 0;
return this;
}
@Override
public synchronized TimeoutStream resume() {
demand = Long.MAX_VALUE;
return this;
}
@Override
public synchronized TimeoutStream endHandler(Handler<Void> endHandler) {
this.endHandler = endHandler;
return this;
}
}
class SharedWorkerPool extends WorkerPool {
private final String name;
private int refCount = 1;
SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) {
super(workerExec, workerMetrics);
this.name = name;
}
@Override
void close() {
synchronized (VertxImpl.this) {
if (refCount > 0) {
refCount = 0;
super.close();
}
}
}
void release() {
synchronized (VertxImpl.this) {
if (--refCount == 0) {
namedWorkerPools.remove(name);
super.close();
}
}
}
}
@Override
public WorkerExecutorImpl createSharedWorkerExecutor(String name) {
return createSharedWorkerExecutor(name, defaultWorkerPoolSize);
}
@Override
public WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize) {
return createSharedWorkerExecutor(name, poolSize, maxWorkerExecTime);
}
@Override
public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) {
return createSharedWorkerExecutor(name, poolSize, maxExecuteTime, maxWorkerExecTimeUnit);
}
@Override
public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) {
SharedWorkerPool sharedWorkerPool = createSharedWorkerPool(name, poolSize, maxExecuteTime, maxExecuteTimeUnit);
AbstractContext ctx = getContext();
CloseHooks hooks = ctx != null ? ctx.closeHooks() : null;
if (hooks == null) {
hooks = closeHooks;
}
WorkerExecutorImpl namedExec = new WorkerExecutorImpl(this, closeHooks, sharedWorkerPool);
hooks.add(namedExec);
return namedExec;
}
public synchronized SharedWorkerPool createSharedWorkerPool(String name, int poolSize, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) {
if (poolSize < 1) {
throw new IllegalArgumentException("poolSize must be > 0");
}
if (maxExecuteTime < 1) {
throw new IllegalArgumentException("maxExecuteTime must be > 0");
}
SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name);
if (sharedWorkerPool == null) {
ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime, maxExecuteTimeUnit));
PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null;
namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics));
} else {
sharedWorkerPool.refCount++;
}
return sharedWorkerPool;
}
@Override
public Vertx exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public Handler<Throwable> exceptionHandler() {
return exceptionHandler;
}
@Override
public void addCloseHook(Closeable hook) {
closeHooks.add(hook);
}
@Override
public void removeCloseHook(Closeable hook) {
closeHooks.remove(hook);
}
private CloseHooks resolveHooks() {
AbstractContext context = getContext();
CloseHooks hooks = context != null ? context.closeHooks() : null;
if (hooks == null) {
hooks = closeHooks;
}
return hooks;
}
}