package io.vertx.amqpbridge.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.VertxException;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonSenderImpl;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
public class AmqpProducerImpl implements MessageProducer<JsonObject> {
private final ProtonSender sender;
private final MessageTranslatorImpl translator = new MessageTranslatorImpl();
private final AmqpBridgeImpl bridge;
private final String amqpAddress;
private boolean closed;
private Handler<Throwable> exceptionHandler;
private Handler<Void> drainHandler;
private long remoteCredit = 0;
public AmqpProducerImpl(AmqpBridgeImpl bridge, ProtonConnection connection, String amqpAddress) {
if(!bridge.onContextEventLoop()) {
throw new IllegalStateException("Should be executing on the bridge context thread");
}
this.bridge = bridge;
this.amqpAddress= amqpAddress;
sender = connection.createSender(amqpAddress);
sender.closeHandler(res -> {
Handler<Throwable> eh = null;
boolean closeSender = false;
synchronized (AmqpProducerImpl.this) {
if (!closed && exceptionHandler != null) {
eh = exceptionHandler;
}
if(!closed) {
closed = true;
closeSender = true;
}
}
if (eh != null) {
if (res.succeeded()) {
eh.handle(new VertxException("Producer closed remotely"));
} else {
eh.handle(new VertxException("Producer closed remotely with error", res.cause()));
}
}
if(closeSender) {
sender.close();
}
});
sender.sendQueueDrainHandler(s -> {
Handler<Void> dh = null;
synchronized (AmqpProducerImpl.this) {
remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
if (drainHandler != null) {
dh = drainHandler;
}
}
if(dh != null) {
dh.handle(null);
}
});
sender.open();
}
@Override
public synchronized boolean writeQueueFull() {
return remoteCredit <= 0;
}
@Override
public MessageProducer<JsonObject> send(JsonObject messageBody) {
return send(messageBody, null);
}
@Override
public <R> MessageProducer<JsonObject> send(JsonObject messageBody, Handler<AsyncResult<Message<R>>> replyHandler) {
return doSend(messageBody, null, replyHandler, null);
}
protected <R> MessageProducer<JsonObject> doSend(JsonObject messageBody,
Handler<AsyncResult<Void>> completionHandler,
Handler<AsyncResult<Message<R>>> replyHandler, String toAddress) {
if (replyHandler != null) {
bridge.verifyReplyToAddressAvailable();
}
org.apache.qpid.proton.message.Message msg = translator.convertToAmqpMessage(messageBody);
if (toAddress != null) {
msg.setAddress(toAddress);
}
synchronized (AmqpProducerImpl.this) {
remoteCredit--;
}
bridge.runOnContext(true, v -> {
if (replyHandler != null) {
bridge.registerReplyToHandler(msg, replyHandler);
}
if (completionHandler == null) {
sender.send(msg);
} else {
sender.send(msg, delivery -> {
switch (delivery.getRemoteState().getType()) {
case Rejected:
completionHandler.handle(Future.failedFuture("message rejected (REJECTED"));
break;
case Modified:
completionHandler.handle(Future.failedFuture("message rejected (MODIFIED)"));
break;
case Released:
completionHandler.handle(Future.failedFuture("message rejected (RELEASED)"));
break;
case Accepted:
completionHandler.handle(Future.succeededFuture());
break;
default:
completionHandler.handle(Future.failedFuture("Unsupported delivery type: " + delivery.getRemoteState().getType()));
}
});
}
synchronized (AmqpProducerImpl.this) {
remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
}
});
return this;
}
@Override
public synchronized MessageProducer<JsonObject> exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public MessageProducer<JsonObject> write(JsonObject data) {
return send(data, null);
}
@Override
public MessageProducer<JsonObject> write(JsonObject data, Handler<AsyncResult<Void>> handler) {
return doSend(data, handler, null, null);
}
@Override
public MessageProducer<JsonObject> setWriteQueueMaxSize(int maxSize) {
return this;
}
@Override
public synchronized MessageProducer<JsonObject> drainHandler(Handler<Void> handler) {
drainHandler = handler;
return this;
}
@Override
public MessageProducer<JsonObject> deliveryOptions(DeliveryOptions options) {
throw new UnsupportedOperationException("DeliveryOptions are not supported by this producer");
}
@Override
public String address() {
return amqpAddress;
}
@Override
public void end() {
close();
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
close(handler);
}
@Override
public void close() {
close(null);
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
synchronized (this) {
closed = true;
}
bridge.runOnContext(true, v -> {
sender.close();
if (handler != null) {
handler.handle(Future.succeededFuture());
}
});
}
}