package io.vertx.core.impl;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.concurrent.FastThreadLocalThread;
import io.vertx.core.*;
import io.vertx.core.impl.launcher.VertxCommandLauncher;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.PoolMetrics;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
abstract class ContextImpl implements ContextInternal {
public static Context context() {
Thread current = Thread.currentThread();
if (current instanceof VertxThread) {
return ((VertxThread) current).getContext();
} else if (current instanceof FastThreadLocalThread) {
return holderLocal.get().ctx;
}
return null;
}
static class Holder implements BlockedThreadChecker.Task {
BlockedThreadChecker checker;
ContextInternal ctx;
long startTime = 0;
long maxExecTime = VertxOptions.DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME;
TimeUnit maxExecTimeUnit = VertxOptions.DEFAULT_MAX_EVENT_LOOP_EXECUTE_TIME_UNIT;
@Override
public long startTime() {
return startTime;
}
@Override
public long maxExecTime() {
return maxExecTime;
}
@Override
public TimeUnit maxExecTimeUnit() {
return maxExecTimeUnit;
}
}
private static FastThreadLocal<Holder> holderLocal = new FastThreadLocal<Holder>() {
@Override
protected Holder initialValue() {
return new Holder();
}
};
private static EventLoop getEventLoop(VertxInternal vertx) {
EventLoopGroup group = vertx.getEventLoopGroup();
if (group != null) {
return group.next();
} else {
return null;
}
}
private static final Logger log = LoggerFactory.getLogger(ContextImpl.class);
private static final String THREAD_CHECKS_PROP_NAME = "vertx.threadChecks";
private static final String DISABLE_TIMINGS_PROP_NAME = "vertx.disableContextTimings";
private static final String DISABLE_TCCL_PROP_NAME = "vertx.disableTCCL";
private static final boolean THREAD_CHECKS = Boolean.getBoolean(THREAD_CHECKS_PROP_NAME);
private static final boolean DISABLE_TIMINGS = Boolean.getBoolean(DISABLE_TIMINGS_PROP_NAME);
private static final boolean DISABLE_TCCL = Boolean.getBoolean(DISABLE_TCCL_PROP_NAME);
protected final VertxInternal owner;
protected final String deploymentID;
protected final JsonObject config;
private Deployment deployment;
private CloseHooks closeHooks;
private final ClassLoader tccl;
private final EventLoop eventLoop;
private ConcurrentMap<Object, Object> contextData;
private volatile Handler<Throwable> exceptionHandler;
protected final WorkerPool workerPool;
protected final WorkerPool internalBlockingPool;
final TaskQueue orderedTasks;
protected final TaskQueue internalOrderedTasks;
protected ContextImpl(VertxInternal vertx, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
this(vertx, getEventLoop(vertx), internalBlockingPool, workerPool, deploymentID, config, tccl);
}
protected ContextImpl(VertxInternal vertx, EventLoop eventLoop, WorkerPool internalBlockingPool, WorkerPool workerPool, String deploymentID, JsonObject config,
ClassLoader tccl) {
if (DISABLE_TCCL && tccl != ClassLoader.getSystemClassLoader()) {
log.warn("You have disabled TCCL checks but you have a custom TCCL to set.");
}
this.deploymentID = deploymentID;
this.config = config;
this.eventLoop = eventLoop;
this.tccl = tccl;
this.owner = vertx;
this.workerPool = workerPool;
this.internalBlockingPool = internalBlockingPool;
this.orderedTasks = new TaskQueue();
this.internalOrderedTasks = new TaskQueue();
this.closeHooks = new CloseHooks(log);
}
static void setContext(ContextImpl context) {
Thread current = Thread.currentThread();
if (current instanceof VertxThread) {
setContext((VertxThread) current, context);
} else {
throw new IllegalStateException("Attempt to setContext on non Vert.x thread " + Thread.currentThread());
}
}
private static void setContext(FastThreadLocalThread thread, ContextImpl context) {
if (thread instanceof VertxThread) {
((VertxThread)thread).setContext(context);
} else {
Holder holder = holderLocal.get();
holder.ctx = context;
}
if (!DISABLE_TCCL) {
if (context != null) {
context.setTCCL();
} else {
Thread.currentThread().setContextClassLoader(null);
}
}
}
public void setDeployment(Deployment deployment) {
this.deployment = deployment;
}
public Deployment getDeployment() {
return deployment;
}
public void addCloseHook(Closeable hook) {
closeHooks.add(hook);
}
public boolean removeCloseHook(Closeable hook) {
return closeHooks.remove(hook);
}
public void runCloseHooks(Handler<AsyncResult<Void>> completionHandler) {
closeHooks.run(completionHandler);
VertxThreadFactory.unsetContext(this);
}
abstract void executeAsync(Handler<Void> task);
abstract <T> void execute(T value, Handler<T> task);
@Override
public abstract boolean isEventLoopContext();
@Override
public abstract boolean isMultiThreadedWorkerContext();
@Override
@SuppressWarnings("unchecked")
public <T> T get(String key) {
return (T) contextData().get(key);
}
@Override
public void put(String key, Object value) {
contextData().put(key, value);
}
@Override
public boolean remove(String key) {
return contextData().remove(key) != null;
}
@Override
public boolean isWorkerContext() {
return !isEventLoopContext();
}
static boolean isOnVertxThread(boolean worker) {
Thread t = Thread.currentThread();
if (t instanceof VertxThread) {
VertxThread vt = (VertxThread) t;
return vt.isWorker() == worker;
}
return false;
}
@Override
public final void executeFromIO(Handler<Void> task) {
executeFromIO(null, task);
}
@Override
public final <T> void executeFromIO(T value, Handler<T> task) {
if (THREAD_CHECKS) {
checkEventLoopThread();
}
execute(value, task);
}
private void checkEventLoopThread() {
Thread current = Thread.currentThread();
if (!(current instanceof FastThreadLocalThread)) {
throw new IllegalStateException("Expected to be on Vert.x thread, but actually on: " + current);
} else if ((current instanceof VertxThread) && ((VertxThread) current).isWorker()) {
throw new IllegalStateException("Event delivered on unexpected worker thread " + current);
}
}
@Override
public void runOnContext(Handler<Void> task) {
try {
executeAsync(task);
} catch (RejectedExecutionException ignore) {
}
}
@Override
public String deploymentID() {
return deploymentID;
}
@Override
public JsonObject config() {
return config;
}
@Override
public List<String> processArgs() {
List<String> processArgument = VertxCommandLauncher.getProcessArguments();
return processArgument != null ? processArgument : Starter.PROCESS_ARGS;
}
public EventLoop nettyEventLoop() {
return eventLoop;
}
public VertxInternal owner() {
return owner;
}
@Override
public <T> void executeBlockingInternal(Handler<Promise<T>> action, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(action, resultHandler, internalBlockingPool.executor(), internalOrderedTasks, internalBlockingPool.metrics());
}
@Override
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, boolean ordered, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, resultHandler, workerPool.executor(), ordered ? orderedTasks : null, workerPool.metrics());
}
@Override
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, true, resultHandler);
}
@Override
public <T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler, TaskQueue queue, Handler<AsyncResult<T>> resultHandler) {
executeBlocking(blockingCodeHandler, resultHandler, workerPool.executor(), queue, workerPool.metrics());
}
<T> void executeBlocking(Handler<Promise<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, TaskQueue queue, PoolMetrics metrics) {
Object queueMetric = metrics != null ? metrics.submitted() : null;
try {
Runnable command = () -> {
VertxThread current = (VertxThread) Thread.currentThread();
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
if (!DISABLE_TIMINGS) {
current.executeStart();
}
Promise<T> promise = Promise.promise();
try {
ContextImpl.setContext(this);
blockingCodeHandler.handle(promise);
} catch (Throwable e) {
promise.tryFail(e);
} finally {
if (!DISABLE_TIMINGS) {
current.executeEnd();
}
}
Future<T> res = promise.future();
if (metrics != null) {
metrics.end(execMetric, res.succeeded());
}
res.setHandler(ar -> {
if (resultHandler != null) {
runOnContext(v -> resultHandler.handle(ar));
} else if (ar.failed()) {
reportException(ar.cause());
}
});
};
if (queue != null) {
queue.execute(command, exec);
} else {
exec.execute(command);
}
} catch (RejectedExecutionException e) {
if (metrics != null) {
metrics.rejected(queueMetric);
}
throw e;
}
}
@Override
public synchronized ConcurrentMap<Object, Object> contextData() {
if (contextData == null) {
contextData = new ConcurrentHashMap<>();
}
return contextData;
}
<T> boolean executeTask(T arg, Handler<T> hTask) {
Thread th = Thread.currentThread();
if (!(th instanceof FastThreadLocalThread)) {
throw new IllegalStateException("Uh oh! context executing with wrong thread! " + th);
}
FastThreadLocalThread current = (FastThreadLocalThread) th;
if (!DISABLE_TIMINGS) {
executeStart(current);
}
try {
setContext(current, this);
hTask.handle(arg);
return true;
} catch (Throwable t) {
reportException(t);
return false;
} finally {
if (!DISABLE_TIMINGS) {
executeEnd(current);
}
}
}
private void executeStart(FastThreadLocalThread thread) {
if (thread instanceof VertxThread) {
((VertxThread)thread).executeStart();
} else {
Holder holder = holderLocal.get();
if (holder.checker == null) {
BlockedThreadChecker checker = owner().blockedThreadChecker();
holder.checker = checker;
holder.maxExecTime = owner.maxEventLoopExecTime();
holder.maxExecTimeUnit = owner.maxEventLoopExecTimeUnit();
checker.registerThread(thread, holder);
}
holder.startTime = System.nanoTime();
}
}
private void executeEnd(FastThreadLocalThread thread) {
if (thread instanceof VertxThread) {
((VertxThread)thread).executeEnd();
} else {
Holder holder = holderLocal.get();
holder.startTime = 0L;
}
}
public void reportException(Throwable t) {
Handler<Throwable> handler = this.exceptionHandler;
if (handler == null) {
handler = owner.exceptionHandler();
}
if (handler != null) {
handler.handle(t);
} else {
log.error("Unhandled exception", t);
}
}
private void setTCCL() {
Thread.currentThread().setContextClassLoader(tccl);
}
@Override
public Context exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public Handler<Throwable> exceptionHandler() {
return exceptionHandler;
}
public int getInstanceCount() {
if (deployment == null) {
return 0;
}
if (deployment.deploymentOptions() == null) {
return 1;
}
return deployment.deploymentOptions().getInstances();
}
}