package io.vertx.core.impl;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.Objects;
import java.util.concurrent.RejectedExecutionException;
public class WorkerContext extends ContextImpl {
WorkerContext(VertxInternal vertx,
VertxTracer<?, ?> tracer,
WorkerPool internalBlockingPool,
WorkerPool workerPool,
Deployment deployment,
CloseHooks closeHooks,
ClassLoader tccl) {
super(vertx, tracer, vertx.getEventLoopGroup().next(), internalBlockingPool, workerPool, deployment, closeHooks, tccl);
}
@Override
void runOnContext(AbstractContext ctx, Handler<Void> action) {
try {
TaskQueue orderedTasks;
if (ctx instanceof DuplicatedContext) {
orderedTasks = ((DuplicatedContext)ctx).orderedTasks();
} else {
orderedTasks = this.orderedTasks;
}
run(ctx, orderedTasks, null, action);
} catch (RejectedExecutionException ignore) {
}
}
@Override
<T> void execute(AbstractContext ctx, T argument, Handler<T> task) {
TaskQueue orderedTasks;
if (ctx instanceof DuplicatedContext) {
orderedTasks = ((DuplicatedContext)ctx).orderedTasks();
} else {
orderedTasks = this.orderedTasks;
}
execute(orderedTasks, argument, task);
}
@Override
<T> void emit(AbstractContext ctx, T argument, Handler<T> task) {
TaskQueue orderedTasks;
if (ctx instanceof DuplicatedContext) {
orderedTasks = ((DuplicatedContext)ctx).orderedTasks();
} else {
orderedTasks = this.orderedTasks;
}
execute(orderedTasks, argument, arg -> {
ctx.dispatch(arg, task);
});
}
@Override
<T> void execute(AbstractContext ctx, Runnable task) {
execute(this, task, Runnable::run);
}
@Override
public boolean isEventLoopContext() {
return false;
}
private <T> void run(ContextInternal ctx, TaskQueue queue, T value, Handler<T> task) {
Objects.requireNonNull(task, "Task handler must not be null");
PoolMetrics metrics = workerPool.metrics();
Object queueMetric = metrics != null ? metrics.submitted() : null;
queue.execute(() -> {
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
try {
ctx.dispatch(value, task);
} finally {
if (metrics != null) {
metrics.end(execMetric, true);
}
}
}, workerPool.executor());
}
private <T> void execute(TaskQueue queue, T argument, Handler<T> task) {
if (Context.isOnWorkerThread()) {
task.handle(argument);
} else {
PoolMetrics metrics = workerPool.metrics();
Object queueMetric = metrics != null ? metrics.submitted() : null;
queue.execute(() -> {
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
try {
task.handle(argument);
} finally {
if (metrics != null) {
metrics.end(execMetric, true);
}
}
}, workerPool.executor());
}
}
@Override
boolean inThread() {
return Context.isOnWorkerThread();
}
}