package io.vertx.amqpbridge.impl;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import io.vertx.amqpbridge.AmqpBridge;
import io.vertx.amqpbridge.AmqpBridgeOptions;
import io.vertx.amqpbridge.AmqpConstants;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.engine.EndpointState;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.impl.ProtonConnectionImpl;
public class AmqpBridgeImpl implements AmqpBridge {
private final Vertx vertx;
private final Context bridgeContext;
private final AmqpBridgeOptions options;
private ProtonClient client;
private ProtonConnection connection;
private ProtonReceiver replyToConsumer;
private String replyToConsumerAddress;
private AmqpProducerImpl replySender;
private Map<String, Handler<?>> replyToMapping = new ConcurrentHashMap<>();
private MessageTranslatorImpl translator = new MessageTranslatorImpl();
private AtomicBoolean started = new AtomicBoolean();
private AtomicBoolean closed = new AtomicBoolean();
private volatile Handler<Void> endHandler;
public AmqpBridgeImpl(Vertx vertx, AmqpBridgeOptions options) {
this.vertx = vertx;
this.options = options;
bridgeContext = vertx.getOrCreateContext();
}
private static final Logger LOG = LoggerFactory.getLogger(AmqpBridgeImpl.class);
@Override
public void start(String hostname, int port, Handler<AsyncResult<AmqpBridge>> resultHandler) {
start(hostname, port, null, null, resultHandler);
}
@Override
public void start(String hostname, int port, String username, String password,
Handler<AsyncResult<AmqpBridge>> resultHandler) {
runOnContext(true, v -> {
startImpl(hostname, port, username, password, resultHandler);
});
}
@SuppressWarnings("deprecation")
private void startImpl(String hostname, int port, String username, String password,
Handler<AsyncResult<AmqpBridge>> resultHandler) {
client = ProtonClient.create(vertx);
client.connect(options, hostname, port, username, password, connectResult -> {
if (connectResult.succeeded()) {
connection = connectResult.result();
LinkedHashMap<Symbol, Object> props = new LinkedHashMap<Symbol, Object>();
props.put(BridgeMetaDataSupportImpl.PRODUCT_KEY, BridgeMetaDataSupportImpl.PRODUCT);
props.put(BridgeMetaDataSupportImpl.VERSION_KEY, BridgeMetaDataSupportImpl.VERSION);
connection.setProperties(props);
if(options.getVhost() != null) {
connection.setHostname(options.getVhost());
}
if(options.getContainerId() != null) {
connection.setContainer(options.getContainerId());
}
connection.disconnectHandler(closeResult -> {
endHandlerImpl();
});
connection.closeHandler(closeResult -> {
try {
disconnectImpl();
} finally {
endHandlerImpl();
}
});
connection.openHandler(openResult -> {
LOG.trace("Bridge connection open complete");
if (openResult.succeeded()) {
if (!options.isReplyHandlingSupport() || !connection.isAnonymousRelaySupported()) {
started.set(true);
resultHandler.handle(Future.succeededFuture(AmqpBridgeImpl.this));
return;
}
replySender = new AmqpProducerImpl(this, connection, null);
replyToConsumer = connection.createReceiver(null);
Source source = (Source) replyToConsumer.getSource();
source.setDynamic(true);
replyToConsumer.handler(this::handleIncomingMessageReply);
replyToConsumer.openHandler(replyToConsumerResult -> {
if (replyToConsumerResult.succeeded()) {
Source remoteSource = (Source) replyToConsumer.getRemoteSource();
if (remoteSource != null) {
replyToConsumerAddress = remoteSource.getAddress();
}
started.set(true);
resultHandler.handle(Future.succeededFuture(AmqpBridgeImpl.this));
} else {
resultHandler.handle(Future.failedFuture(replyToConsumerResult.cause()));
}
}).open();
} else {
resultHandler.handle(Future.failedFuture(openResult.cause()));
}
}).open();
connection.open();
} else {
resultHandler.handle(Future.failedFuture(connectResult.cause()));
}
});
}
private void endHandlerImpl() {
Handler<Void> handler = endHandler;
endHandler = null;
if (handler != null && !closed.get()) {
handler.handle(null);
}
}
@SuppressWarnings("unchecked")
@Override
public MessageConsumer<JsonObject> createConsumer(String amqpAddress) {
if (!started.get()) {
throw new IllegalStateException("Bridge was not successfully started");
}
return new AmqpConsumerImpl(this, connection, amqpAddress);
}
@SuppressWarnings("unchecked")
@Override
public MessageProducer<JsonObject> createProducer(String amqpAddress) {
if (!started.get()) {
throw new IllegalStateException("Bridge was not successfully started");
}
return new AmqpProducerImpl(this, connection, amqpAddress);
}
@Override
public void close(Handler<AsyncResult<Void>> resultHandler) {
closed.set(true);
runOnContext(true, v -> {
shutdownImpl(resultHandler);
});
}
private void shutdownImpl(Handler<AsyncResult<Void>> resultHandler) {
if (connection != null) {
if (isLocalOpen(connection) && isRemoteOpen(connection)) {
connection.closeHandler(res -> {
try {
disconnectImpl();
} finally {
if (res.succeeded()) {
resultHandler.handle(Future.succeededFuture());
} else {
resultHandler.handle(Future.failedFuture(res.cause()));
}
}
}).close();
} else {
try {
disconnectImpl();
} finally {
resultHandler.handle(Future.succeededFuture());
}
}
} else {
resultHandler.handle(Future.succeededFuture());
}
}
private void disconnectImpl() {
ProtonConnection conn = connection;
connection = null;
if (conn != null) {
try {
conn.close();
} finally {
conn.disconnect();
}
}
}
private boolean isLocalOpen(ProtonConnection connection) {
return ((ProtonConnectionImpl) connection).getLocalState() == EndpointState.ACTIVE;
}
private boolean isRemoteOpen(ProtonConnection connection) {
return ((ProtonConnectionImpl) connection).getRemoteState() == EndpointState.ACTIVE;
}
@Override
public void endHandler(Handler<Void> endHandler) {
this.endHandler = endHandler;
}
<R> void registerReplyToHandler(org.apache.qpid.proton.message.Message msg,
Handler<AsyncResult<Message<R>>> replyHandler) {
verifyReplyToAddressAvailable();
msg.setReplyTo(replyToConsumerAddress);
String generatedMessageId = UUID.randomUUID().toString();
msg.setMessageId(generatedMessageId);
replyToMapping.put(generatedMessageId, replyHandler);
}
void verifyReplyToAddressAvailable() throws IllegalStateException {
if (replyToConsumerAddress == null) {
throw new IllegalStateException(
"No reply-to address available, unable to send with a reply handler. Try an explicit consumer for replies.");
}
}
private void handleIncomingMessageReply(ProtonDelivery delivery,
org.apache.qpid.proton.message.Message protonMessage) {
Object correlationId = protonMessage.getCorrelationId();
if (correlationId != null) {
Handler<?> handler = replyToMapping.remove(correlationId);
if (handler != null) {
@SuppressWarnings("unchecked")
Handler<AsyncResult<Message<JsonObject>>> h = (Handler<AsyncResult<Message<JsonObject>>>) handler;
JsonObject body = translator.convertToJsonObject(protonMessage);
Message<JsonObject> msg = new AmqpMessageImpl(body, AmqpBridgeImpl.this, protonMessage, delivery,
replyToConsumerAddress, protonMessage.getReplyTo());
AsyncResult<Message<JsonObject>> result = Future.succeededFuture(msg);
h.handle(result);
return;
}
}
LOG.error("Received message on replyTo consumer, could not match to a replyHandler: " + protonMessage);
}
<R> void sendReply(org.apache.qpid.proton.message.Message origIncomingMessage, JsonObject replyBody,
Handler<AsyncResult<Message<R>>> replyHandler) {
if(replySender == null) {
throw new IllegalStateException(
"No reply sender available, unable to send implicit replies. Try an explicit producer for replies.");
}
String replyAddress = origIncomingMessage.getReplyTo();
if (replyAddress == null) {
throw new IllegalStateException("Original message has no reply-to address, unable to send implicit reply");
}
Object origMessageId = origIncomingMessage.getMessageId();
if (origMessageId != null) {
JsonObject replyBodyProps = replyBody.getJsonObject(AmqpConstants.PROPERTIES);
if (replyBodyProps == null) {
replyBodyProps = new JsonObject();
replyBody.put(AmqpConstants.PROPERTIES, replyBodyProps);
}
replyBodyProps.put(AmqpConstants.PROPERTIES_CORRELATION_ID, origMessageId);
}
replySender.doSend(replyBody, null, replyHandler, replyAddress);
}
boolean onContextEventLoop() {
return ((ContextInternal) bridgeContext).nettyEventLoop().inEventLoop();
}
void runOnContext(boolean immediateIfOnContext, Handler<Void> action) {
if (immediateIfOnContext && onContextEventLoop()) {
action.handle(null);
} else {
bridgeContext.runOnContext(action);
}
}
}