package io.vertx.core.eventbus.impl;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
class ReplyHandler<T> extends HandlerRegistration<T> {
private final EventBusImpl eventBus;
private final ContextInternal context;
private final Promise<Message<T>> result;
private final long timeoutID;
private final boolean src;
private final String repliedAddress;
Object trace;
ReplyHandler(EventBusImpl eventBus, ContextInternal context, String address, String repliedAddress, boolean src, long timeout) {
super(context, eventBus, address, src);
this.eventBus = eventBus;
this.context = context;
this.result = context.promise();
this.src = src;
this.repliedAddress = repliedAddress;
this.timeoutID = eventBus.vertx.setTimer(timeout, id -> {
fail(new ReplyException(ReplyFailure.TIMEOUT, "Timed out after waiting " + timeout + "(ms) for a reply. address: " + address + ", repliedAddress: " + repliedAddress));
});
}
private void trace(Object reply, Throwable failure) {
VertxTracer tracer = context.tracer();
Object trace = this.trace;
if (tracer != null && src && trace != null) {
tracer.receiveResponse(context, reply, trace, failure, TagExtractor.empty());
}
}
Future<Message<T>> result() {
return result.future();
}
void fail(ReplyException failure) {
unregister(ar -> {});
if (failure.failureType() == ReplyFailure.NO_HANDLERS) {
eventBus.vertx.cancelTimer(timeoutID);
}
if (result.tryFail(failure)) {
if (eventBus.metrics != null) {
eventBus.metrics.replyFailure(repliedAddress, failure.failureType());
}
trace(null, failure);
}
}
@Override
protected boolean doReceive(Message<T> reply) {
try {
dispatch(null, reply, context);
} finally {
unregister();
}
return true;
}
void register() {
register(repliedAddress, true, null);
}
@Override
protected void dispatch(Message<T> reply, ContextInternal context, Handler<Message<T>> handler ) {
eventBus.vertx.cancelTimer(timeoutID);
if (reply.body() instanceof ReplyException) {
fail((ReplyException) reply.body());
} else {
trace(reply, null);
result.complete(reply);
}
}
}