package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.VertxMetrics;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
public class EventBusImpl implements EventBus, MetricsProvider {
private static final Logger log = LoggerFactory.getLogger(EventBusImpl.class);
private final List<Handler<DeliveryContext>> sendInterceptors = new CopyOnWriteArrayList<>();
private final List<Handler<DeliveryContext>> receiveInterceptors = new CopyOnWriteArrayList<>();
private final AtomicLong replySequence = new AtomicLong(0);
protected final VertxInternal vertx;
protected final EventBusMetrics metrics;
protected final ConcurrentMap<String, ConcurrentCyclicSequence<HandlerHolder>> handlerMap = new ConcurrentHashMap<>();
protected final CodecManager codecManager = new CodecManager();
protected volatile boolean started;
public EventBusImpl(VertxInternal vertx) {
VertxMetrics metrics = vertx.metricsSPI();
this.vertx = vertx;
this.metrics = metrics != null ? metrics.createEventBusMetrics() : null;
}
@Override
public <T> EventBus addOutboundInterceptor(Handler<DeliveryContext<T>> interceptor) {
sendInterceptors.add((Handler) interceptor);
return this;
}
@Override
public <T> EventBus addInboundInterceptor(Handler<DeliveryContext<T>> interceptor) {
receiveInterceptors.add((Handler)interceptor);
return this;
}
@Override
public <T> EventBus removeOutboundInterceptor(Handler<DeliveryContext<T>> interceptor) {
sendInterceptors.remove(interceptor);
return this;
}
@Override
public <T> EventBus removeInboundInterceptor(Handler<DeliveryContext<T>> interceptor) {
receiveInterceptors.remove(interceptor);
return this;
}
public synchronized void start(Handler<AsyncResult<Void>> completionHandler) {
if (started) {
throw new IllegalStateException("Already started");
}
started = true;
completionHandler.handle(Future.succeededFuture());
}
@Override
public EventBus send(String address, Object message) {
return send(address, message, new DeliveryOptions(), null);
}
@Override
public <T> EventBus send(String address, Object message, Handler<AsyncResult<Message<T>>> replyHandler) {
return send(address, message, new DeliveryOptions(), replyHandler);
}
@Override
public EventBus send(String address, Object message, DeliveryOptions options) {
return send(address, message, options, null);
}
@Override
public <T> EventBus send(String address, Object message, DeliveryOptions options, Handler<AsyncResult<Message<T>>> replyHandler) {
sendOrPubInternal(createMessage(true, address, options.getHeaders(), message, options.getCodecName(), null), options, replyHandler);
return this;
}
@Override
public <T> MessageProducer<T> sender(String address) {
Objects.requireNonNull(address, "address");
return new MessageProducerImpl<>(vertx, address, true, new DeliveryOptions());
}
@Override
public <T> MessageProducer<T> sender(String address, DeliveryOptions options) {
Objects.requireNonNull(address, "address");
Objects.requireNonNull(options, "options");
return new MessageProducerImpl<>(vertx, address, true, options);
}
@Override
public <T> MessageProducer<T> publisher(String address) {
Objects.requireNonNull(address, "address");
return new MessageProducerImpl<>(vertx, address, false, new DeliveryOptions());
}
@Override
public <T> MessageProducer<T> publisher(String address, DeliveryOptions options) {
Objects.requireNonNull(address, "address");
Objects.requireNonNull(options, "options");
return new MessageProducerImpl<>(vertx, address, false, options);
}
@Override
public EventBus publish(String address, Object message) {
return publish(address, message, new DeliveryOptions());
}
@Override
public EventBus publish(String address, Object message, DeliveryOptions options) {
sendOrPubInternal(createMessage(false, address, options.getHeaders(), message, options.getCodecName(), null), options, null);
return this;
}
@Override
public <T> MessageConsumer<T> consumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, metrics, this, address, null, false, null, -1);
}
@Override
public <T> MessageConsumer<T> consumer(String address, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = consumer(address);
consumer.handler(handler);
return consumer;
}
@Override
public <T> MessageConsumer<T> localConsumer(String address) {
checkStarted();
Objects.requireNonNull(address, "address");
return new HandlerRegistration<>(vertx, metrics, this, address, null, true, null, -1);
}
@Override
public <T> MessageConsumer<T> localConsumer(String address, Handler<Message<T>> handler) {
Objects.requireNonNull(handler, "handler");
MessageConsumer<T> consumer = localConsumer(address);
consumer.handler(handler);
return consumer;
}
@Override
public EventBus registerCodec(MessageCodec codec) {
codecManager.registerCodec(codec);
return this;
}
@Override
public EventBus unregisterCodec(String name) {
codecManager.unregisterCodec(name);
return this;
}
@Override
public <T> EventBus registerDefaultCodec(Class<T> clazz, MessageCodec<T, ?> codec) {
codecManager.registerDefaultCodec(clazz, codec);
return this;
}
@Override
public EventBus unregisterDefaultCodec(Class clazz) {
codecManager.unregisterDefaultCodec(clazz);
return this;
}
@Override
public void close(Handler<AsyncResult<Void>> completionHandler) {
checkStarted();
unregisterAll();
if (metrics != null) {
metrics.close();
}
if (completionHandler != null) {
vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
}
}
@Override
public boolean isMetricsEnabled() {
return metrics != null;
}
@Override
public EventBusMetrics<?> getMetrics() {
return metrics;
}
public MessageImpl createMessage(boolean send, String address, MultiMap headers, Object body, String codecName, Handler<AsyncResult<Void>> writeHandler) {
Objects.requireNonNull(address, "no null address accepted");
MessageCodec codec = codecManager.lookupCodec(body, codecName);
@SuppressWarnings("unchecked")
MessageImpl msg = new MessageImpl(address, null, headers, body, codec, send, this, writeHandler);
return msg;
}
protected <T> HandlerHolder<T> addRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(registration.getHandler(), "handler");
LocalRegistrationResult<T> result = addLocalRegistration(address, registration, replyHandler, localOnly);
addRegistration(result.newAddress, address, replyHandler, localOnly, registration::setResult);
return result.holder;
}
protected <T> void addRegistration(boolean newAddress, String address,
boolean replyHandler, boolean localOnly,
Handler<AsyncResult<Void>> completionHandler) {
completionHandler.handle(Future.succeededFuture());
}
private static class LocalRegistrationResult<T> {
final HandlerHolder<T> holder;
final boolean newAddress;
LocalRegistrationResult(HandlerHolder<T> holder, boolean newAddress) {
this.holder = holder;
this.newAddress = newAddress;
}
}
private <T> LocalRegistrationResult<T> addLocalRegistration(String address, HandlerRegistration<T> registration,
boolean replyHandler, boolean localOnly) {
Objects.requireNonNull(address, "address");
Context context = Vertx.currentContext();
boolean hasContext = context != null;
if (!hasContext) {
context = vertx.getOrCreateContext();
}
registration.setHandlerContext(context);
HandlerHolder<T> holder = new HandlerHolder<>(registration, replyHandler, localOnly, context);
ConcurrentCyclicSequence<HandlerHolder> handlers = new ConcurrentCyclicSequence<HandlerHolder>().add(holder);
ConcurrentCyclicSequence<HandlerHolder> actualHandlers = handlerMap.merge(
address,
handlers,
(old, prev) -> old.add(prev.first()));
if (hasContext) {
HandlerEntry entry = new HandlerEntry<>(address, registration);
context.addCloseHook(entry);
}
boolean newAddress = handlers == actualHandlers;
return new LocalRegistrationResult<>(holder, newAddress);
}
protected <T> void removeRegistration(HandlerHolder<T> holder, Handler<AsyncResult<Void>> completionHandler) {
boolean last = removeLocalRegistration(holder);
removeRegistration(last ? holder : null, holder.getHandler().address(), completionHandler);
}
protected <T> void removeRegistration(HandlerHolder<T> handlerHolder, String address,
Handler<AsyncResult<Void>> completionHandler) {
callCompletionHandlerAsync(completionHandler);
}
private <T> boolean removeLocalRegistration(HandlerHolder<T> holder) {
String address = holder.getHandler().address();
boolean last = handlerMap.compute(address, (key, val) -> {
if (val == null) {
return null;
}
ConcurrentCyclicSequence<HandlerHolder> next = val.remove(holder);
return next.size() == 0 ? null : next;
}) == null;
if (holder.setRemoved()) {
holder.getContext().removeCloseHook(new HandlerEntry<>(address, holder.getHandler()));
}
return last;
}
protected <T> void sendReply(MessageImpl replyMessage, MessageImpl replierMessage, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyMessage.address() == null) {
throw new IllegalStateException("address not specified");
} else {
HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(replyMessage, options, replyHandler);
new OutboundDeliveryContext<>(replyMessage, options, replyHandlerRegistration, replierMessage).next();
}
}
protected <T> void sendReply(OutboundDeliveryContext<T> sendContext, MessageImpl replierMessage) {
sendOrPub(sendContext);
}
protected <T> void sendOrPub(OutboundDeliveryContext<T> sendContext) {
MessageImpl message = sendContext.message;
if (metrics != null) {
metrics.messageSent(message.address(), !message.isSend(), true, false);
}
deliverMessageLocally(sendContext);
}
protected <T> Handler<Message<T>> convertHandler(Handler<AsyncResult<Message<T>>> handler) {
return reply -> {
Future<Message<T>> result;
if (reply.body() instanceof ReplyException) {
ReplyException exception = (ReplyException) reply.body();
if (metrics != null) {
metrics.replyFailure(reply.address(), exception.failureType());
}
result = Future.failedFuture(exception);
} else {
result = Future.succeededFuture(reply);
}
handler.handle(result);
};
}
protected void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionHandler) {
if (completionHandler != null) {
vertx.runOnContext(v -> {
completionHandler.handle(Future.succeededFuture());
});
}
}
protected <T> void deliverMessageLocally(OutboundDeliveryContext<T> sendContext) {
ReplyException failure = deliverMessageLocally(sendContext.message);
if (failure != null) {
if (metrics != null) {
metrics.replyFailure(sendContext.message.address, ReplyFailure.NO_HANDLERS);
}
if (sendContext.handlerRegistration != null) {
sendContext.handlerRegistration.sendAsyncResultFailure(failure);
}
}
}
protected boolean isMessageLocal(MessageImpl msg) {
return true;
}
protected ReplyException deliverMessageLocally(MessageImpl msg) {
msg.setBus(this);
ConcurrentCyclicSequence<HandlerHolder> handlers = handlerMap.get(msg.address());
if (handlers != null) {
if (msg.isSend()) {
HandlerHolder holder = handlers.next();
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), holder != null ? 1 : 0);
}
if (holder != null) {
deliverToHandler(msg, holder);
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.succeededFuture());
}
}
} else {
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), handlers.size());
}
for (HandlerHolder holder: handlers) {
deliverToHandler(msg, holder);
}
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.succeededFuture());
}
}
return null;
} else {
if (metrics != null) {
metrics.messageReceived(msg.address(), !msg.isSend(), isMessageLocal(msg), 0);
}
ReplyException failure = new ReplyException(ReplyFailure.NO_HANDLERS, "No handlers for address " + msg.address);
Handler<AsyncResult<Void>> handler = msg.writeHandler;
if (handler != null) {
handler.handle(Future.failedFuture(failure));
}
return failure;
}
}
protected void checkStarted() {
if (!started) {
throw new IllegalStateException("Event Bus is not started");
}
}
protected String generateReplyAddress() {
return "__vertx.reply." + Long.toString(replySequence.incrementAndGet());
}
private <T> HandlerRegistration<T> createReplyHandlerRegistration(MessageImpl message,
DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
if (replyHandler != null) {
long timeout = options.getSendTimeout();
String replyAddress = generateReplyAddress();
message.setReplyAddress(replyAddress);
Handler<Message<T>> simpleReplyHandler = convertHandler(replyHandler);
HandlerRegistration<T> registration =
new HandlerRegistration<>(vertx, metrics, this, replyAddress, message.address, true, replyHandler, timeout);
registration.handler(simpleReplyHandler);
return registration;
} else {
return null;
}
}
public <T> void sendOrPubInternal(MessageImpl message, DeliveryOptions options,
Handler<AsyncResult<Message<T>>> replyHandler) {
checkStarted();
HandlerRegistration<T> replyHandlerRegistration = createReplyHandlerRegistration(message, options, replyHandler);
OutboundDeliveryContext<T> sendContext = new OutboundDeliveryContext<>(message, options, replyHandlerRegistration);
sendContext.next();
}
protected class OutboundDeliveryContext<T> implements DeliveryContext<T> {
public final MessageImpl message;
public final DeliveryOptions options;
public final Iterator<Handler<DeliveryContext>> iter;
private final HandlerRegistration<T> handlerRegistration;
private final MessageImpl replierMessage;
private OutboundDeliveryContext(MessageImpl message, DeliveryOptions options, HandlerRegistration<T> handlerRegistration) {
this(message, options, handlerRegistration, null);
}
private OutboundDeliveryContext(MessageImpl message, DeliveryOptions options, HandlerRegistration<T> handlerRegistration, MessageImpl replierMessage) {
this.message = message;
this.options = options;
this.handlerRegistration = handlerRegistration;
this.iter = sendInterceptors.iterator();
this.replierMessage = replierMessage;
}
@Override
public Message<T> message() {
return message;
}
@Override
public void next() {
if (iter.hasNext()) {
Handler<DeliveryContext> handler = iter.next();
try {
if (handler != null) {
handler.handle(this);
} else {
next();
}
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
if (replierMessage == null) {
sendOrPub(this);
} else {
sendReply(this, replierMessage);
}
}
}
@Override
public boolean send() {
return message.isSend();
}
@Override
public Object body() {
return message.sentBody;
}
}
private void unregisterAll() {
for (ConcurrentCyclicSequence<HandlerHolder> handlers: handlerMap.values()) {
for (HandlerHolder holder: handlers) {
holder.getHandler().unregister();
}
}
}
private <T> void deliverToHandler(MessageImpl msg, HandlerHolder<T> holder) {
MessageImpl copied = msg.copyBeforeReceive();
DeliveryContext<T> receiveContext = new InboundDeliveryContext<>(copied, holder);
if (metrics != null) {
metrics.scheduleMessage(holder.getHandler().getMetric(), msg.isLocal());
}
holder.getContext().runOnContext((v) -> {
try {
if (!holder.isRemoved()) {
receiveContext.next();
}
} finally {
if (holder.isReplyHandler()) {
holder.getHandler().unregister();
}
}
});
}
protected class InboundDeliveryContext<T> implements DeliveryContext<T> {
private final MessageImpl message;
private final Iterator<Handler<DeliveryContext>> iter;
private final HandlerHolder<T> holder;
private InboundDeliveryContext(MessageImpl message, HandlerHolder<T> holder) {
this.message = message;
this.holder = holder;
this.iter = receiveInterceptors.iterator();
}
@Override
public Message<T> message() {
return message;
}
@Override
public void next() {
if (iter.hasNext()) {
try {
Handler<DeliveryContext> handler = iter.next();
if (handler != null) {
handler.handle(this);
} else {
next();
}
} catch (Throwable t) {
log.error("Failure in interceptor", t);
}
} else {
holder.getHandler().handle(message);
}
}
@Override
public boolean send() {
return message.isSend();
}
@Override
public Object body() {
return message.receivedBody;
}
}
public class HandlerEntry<T> implements Closeable {
final String address;
final HandlerRegistration<T> handler;
public HandlerEntry(String address, HandlerRegistration<T> handler) {
this.address = address;
this.handler = handler;
}
@Override
public boolean equals(Object o) {
if (o == null) return false;
if (this == o) return true;
if (getClass() != o.getClass()) return false;
HandlerEntry entry = (HandlerEntry) o;
if (!address.equals(entry.address)) return false;
if (!handler.equals(entry.handler)) return false;
return true;
}
@Override
public int hashCode() {
int result = address != null ? address.hashCode() : 0;
result = 31 * result + (handler != null ? handler.hashCode() : 0);
return result;
}
public void close(Handler<AsyncResult<Void>> completionHandler) {
handler.unregister(completionHandler);
}
}
@Override
protected void finalize() throws Throwable {
close(ar -> {});
super.finalize();
}
}