package io.vertx.rabbitmq.impl;
import com.rabbitmq.client.BasicProperties;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConfirmation;
import io.vertx.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherConfirmation;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
public class RabbitMQPublisherImpl implements RabbitMQPublisher, ReadStream<RabbitMQPublisherConfirmation> {
private static final Logger log = LoggerFactory.getLogger(RabbitMQPublisherImpl.class);
private final Vertx vertx;
private final RabbitMQClient client;
private final InboundBuffer<RabbitMQPublisherConfirmation> confirmations;
private final Context context;
private final RabbitMQPublisherOptions options;
private final Deque<MessageDetails> pendingAcks = new ArrayDeque<>();
private final InboundBuffer<MessageDetails> sendQueue;
private long lastChannelInstance = 0;
private volatile boolean stopped = false;
static class MessageDetails {
private final String exchange;
private final String routingKey;
private final BasicProperties properties;
private final Buffer message;
private final Handler<AsyncResult<Void>> publishHandler;
private volatile long deliveryTag;
MessageDetails(String exchange, String routingKey, BasicProperties properties, Buffer message, Handler<AsyncResult<Void>> publishHandler) {
this.exchange = exchange;
this.routingKey = routingKey;
this.properties = properties;
this.message = message;
this.publishHandler = publishHandler;
}
public void setDeliveryTag(long deliveryTag) {
this.deliveryTag = deliveryTag;
}
}
public RabbitMQPublisherImpl(Vertx vertx
, RabbitMQClient client
, RabbitMQPublisherOptions options
) {
this.vertx = vertx;
this.client = client;
this.context = vertx.getOrCreateContext();
this.confirmations = new InboundBuffer<>(context);
this.sendQueue = new InboundBuffer<>(context);
sendQueue.handler(md -> handleMessageSend(md));
this.options = options;
this.client.addConnectionEstablishedCallback(p -> {
addConfirmListener(client, options, p);
});
}
@Override
public void start(Handler<AsyncResult<Void>> resultHandler) {
Promise<Void> promise = startForPromise();
promise.future().onComplete(resultHandler);
}
@Override
public Future<Void> start() {
Promise<Void> promise = startForPromise();
return promise.future();
}
@Override
public void stop(Handler<AsyncResult<Void>> resultHandler) {
stopped = true;
sendQueue.pause();
if (sendQueue.isEmpty()) {
resultHandler.handle(Future.succeededFuture());
} else {
sendQueue.emptyHandler(v -> {
resultHandler.handle(Future.succeededFuture());
});
}
sendQueue.resume();
}
@Override
public Future<Void> stop() {
Promise<Void> promise = Promise.promise();
stop(promise);
return promise.future();
}
@Override
public void restart() {
stopped = false;
sendQueue.pause();
sendQueue.emptyHandler(null);
sendQueue.resume();
}
private Promise<Void> startForPromise() {
Promise<Void> promise = Promise.promise();
addConfirmListener(client, options, promise);
return promise;
}
protected final void addConfirmListener(RabbitMQClient client1
, RabbitMQPublisherOptions options1
, Promise<Void> promise
) {
client1.addConfirmListener(options1.getMaxInternalQueueSize(),
ar -> {
if (ar.succeeded()) {
ar.result().handler(confirmation -> {
handleConfirmation(confirmation);
});
promise.complete();
} else {
log.error("Failed to add confirmListener: ", ar.cause());
promise.fail(ar.cause());
}
});
}
@Override
public ReadStream<RabbitMQPublisherConfirmation> getConfirmationStream() {
return this;
}
@Override
public int queueSize() {
return sendQueue.size();
}
private void handleMessageSend(MessageDetails md) {
sendQueue.pause();
synchronized(pendingAcks) {
pendingAcks.add(md);
}
doSend(md);
}
private void doSend(MessageDetails md) {
try {
client.basicPublishWithDeliveryTag(md.exchange, md.routingKey, md.properties, md.message
, dt -> { md.setDeliveryTag(dt); }
, publishResult -> {
try {
if (publishResult.succeeded()) {
if (md.publishHandler != null) {
try {
md.publishHandler.handle(publishResult);
} catch(Throwable ex) {
log.warn("Failed to handle publish result", ex);
}
}
sendQueue.resume();
} else {
log.info("Failed to publish message: " + publishResult.cause().toString());
synchronized(pendingAcks) {
pendingAcks.remove(md);
}
client.stop(v -> {
client.start(v2 -> {
doSend(md);
});
});
}
} finally {
}
});
} catch(Throwable ex) {
synchronized(pendingAcks) {
pendingAcks.remove(md);
}
client.stop(v -> {
client.start(v2 -> {
doSend(md);
});
});
}
}
private void handleConfirmation(RabbitMQConfirmation rawConfirmation) {
synchronized(pendingAcks) {
if (lastChannelInstance == 0) {
lastChannelInstance = rawConfirmation.getChannelInstance();
} else if (lastChannelInstance != rawConfirmation.getChannelInstance()) {
pendingAcks.clear();
lastChannelInstance = rawConfirmation.getChannelInstance();
}
if (rawConfirmation.isMultiple()) {
for (Iterator<MessageDetails> iter = pendingAcks.iterator(); iter.hasNext(); ) {
MessageDetails md = iter.next();
if (md.deliveryTag <= rawConfirmation.getDeliveryTag()) {
String messageId = md.properties == null ? null : md.properties.getMessageId();
confirmations.write(new RabbitMQPublisherConfirmation(messageId, rawConfirmation.isSucceeded()));
iter.remove();
} else {
break ;
}
}
} else {
for (Iterator<MessageDetails> iter = pendingAcks.iterator(); iter.hasNext(); ) {
MessageDetails md = iter.next();
if (md.deliveryTag == rawConfirmation.getDeliveryTag()) {
String messageId = md.properties == null ? null : md.properties.getMessageId();
confirmations.write(new RabbitMQPublisherConfirmation(messageId, rawConfirmation.isSucceeded()));
iter.remove();
} else {
break ;
}
}
}
}
}
@Override
public void publish(String exchange, String routingKey, BasicProperties properties, Buffer body, Handler<AsyncResult<Void>> resultHandler) {
if (!stopped) {
context.runOnContext(e -> {
sendQueue.write(new MessageDetails(exchange, routingKey, properties, body, resultHandler));
});
}
}
@Override
public Future<Void> publish(String exchange, String routingKey, BasicProperties properties, Buffer body) {
Promise<Void> promise = Promise.promise();
publish(exchange, routingKey, properties, body, promise);
return promise.future();
}
@Override
public RabbitMQPublisherImpl exceptionHandler(Handler<Throwable> hndlr) {
confirmations.exceptionHandler(hndlr);
return this;
}
@Override
public RabbitMQPublisherImpl handler(Handler<RabbitMQPublisherConfirmation> hndlr) {
confirmations.handler(hndlr);
return this;
}
@Override
public RabbitMQPublisherImpl pause() {
confirmations.pause();
return this;
}
@Override
public RabbitMQPublisherImpl resume() {
confirmations.resume();
return this;
}
@Override
public RabbitMQPublisherImpl fetch(long l) {
confirmations.fetch(l);
return this;
}
@Override
public RabbitMQPublisherImpl endHandler(Handler<Void> hndlr) {
return this;
}
}