package io.vertx.amqp.impl;
import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpMessage;
import io.vertx.amqp.AmqpSender;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonSenderImpl;
public class AmqpSenderImpl implements AmqpSender {
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpSender.class);
private final ProtonSender sender;
private final AmqpConnectionImpl connection;
private boolean closed;
private Handler<Throwable> exceptionHandler;
private Handler<Void> drainHandler;
private long remoteCredit = 0;
private AmqpSenderImpl(ProtonSender sender, AmqpConnectionImpl connection,
Handler<AsyncResult<AmqpSender>> completionHandler) {
this.sender = sender;
this.connection = connection;
sender
.closeHandler(res -> onClose(sender, res, false))
.detachHandler(res -> onClose(sender, res, true));
sender.sendQueueDrainHandler(s -> {
Handler<Void> dh = null;
synchronized (AmqpSenderImpl.this) {
remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
if (drainHandler != null) {
dh = drainHandler;
}
}
if (dh != null) {
dh.handle(null);
}
});
sender.openHandler(done -> {
if (done.failed()) {
completionHandler.handle(done.mapEmpty());
} else {
connection.register(this);
completionHandler.handle(Future.succeededFuture(this));
}
});
sender.open();
}
static void create(ProtonSender sender, AmqpConnectionImpl connection,
Handler<AsyncResult<AmqpSender>> completionHandler) {
new AmqpSenderImpl(sender, connection, completionHandler);
}
private void onClose(ProtonSender sender, AsyncResult<ProtonSender> res, boolean detach) {
Handler<Throwable> eh = null;
boolean closeSender = false;
synchronized (AmqpSenderImpl.this) {
if (!closed && exceptionHandler != null) {
eh = exceptionHandler;
}
if (!closed) {
closed = true;
closeSender = true;
}
}
if (eh != null) {
if (res.succeeded()) {
eh.handle(new Exception("Sender closed remotely"));
} else {
eh.handle(new Exception("Sender closed remotely with error", res.cause()));
}
}
if (closeSender) {
if (detach) {
sender.detach();
} else {
sender.close();
}
}
}
@Override
public synchronized boolean writeQueueFull() {
return remoteCredit <= 0;
}
@Override
public AmqpConnection connection() {
return connection;
}
@Override
public AmqpSender send(AmqpMessage message) {
return doSend(message, null);
}
private AmqpSender doSend(AmqpMessage message, Handler<AsyncResult<Void>> acknowledgmentHandler) {
AmqpMessage updated;
if (message.address() == null) {
updated = AmqpMessage.create(message).address(address()).build();
} else {
updated = message;
}
Handler<ProtonDelivery> ack = delivery -> {
Handler<AsyncResult<Void>> handler = acknowledgmentHandler;
if (acknowledgmentHandler == null) {
handler = ar -> {
if (ar.failed()) {
LOGGER.warn("Message rejected by remote peer", ar.cause());
}
};
}
switch (delivery.getRemoteState().getType()) {
case Rejected:
handler.handle(Future.failedFuture("message rejected (REJECTED"));
break;
case Modified:
handler.handle(Future.failedFuture("message rejected (MODIFIED)"));
break;
case Released:
handler.handle(Future.failedFuture("message rejected (RELEASED)"));
break;
case Accepted:
handler.handle(Future.succeededFuture());
break;
default:
handler.handle(Future.failedFuture("Unsupported delivery type: " + delivery.getRemoteState().getType()));
}
};
synchronized (AmqpSenderImpl.this) {
remoteCredit--;
}
connection.runWithTrampoline(x -> {
sender.send(updated.unwrap(), ack);
synchronized (AmqpSenderImpl.this) {
remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
}
});
return this;
}
@Override
public synchronized AmqpSender exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
return this;
}
@Override
public AmqpSender write(AmqpMessage data) {
return doSend(data, null);
}
@Override
public AmqpSender write(AmqpMessage data, Handler<AsyncResult<Void>> handler) {
return doSend(data, handler);
}
@Override
public AmqpSender setWriteQueueMaxSize(int maxSize) {
return this;
}
@Override
public void end() {
close(null);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
close(handler);
}
@Override
public synchronized AmqpSender drainHandler(Handler<Void> handler) {
drainHandler = handler;
return this;
}
@Override
public AmqpSender sendWithAck(AmqpMessage message, Handler<AsyncResult<Void>> acknowledgementHandler) {
return doSend(message, acknowledgementHandler);
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
Handler<AsyncResult<Void>> actualHandler;
if (handler == null) {
actualHandler = x -> { };
} else {
actualHandler = handler;
}
synchronized (this) {
if (closed) {
actualHandler.handle(Future.succeededFuture());
return;
}
closed = true;
}
connection.unregister(this);
connection.runWithTrampoline(x -> {
if (sender.isOpen()) {
try {
sender
.closeHandler(v -> actualHandler.handle(v.mapEmpty()))
.close();
} catch (Exception e) {
actualHandler.handle(Future.failedFuture(e));
}
} else {
actualHandler.handle(Future.succeededFuture());
}
});
}
@Override
public String address() {
return sender.getRemoteAddress();
}
}