package io.vertx.rabbitmq.impl;
import io.vertx.core.*;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQMessage;
import java.io.IOException;
A implementation of RabbitMQConsumer
/**
* A implementation of {@link RabbitMQConsumer}
*/
public class RabbitMQConsumerImpl implements RabbitMQConsumer {
private static final Logger log = LoggerFactory.getLogger(RabbitMQConsumerImpl.class);
private Handler<Throwable> exceptionHandler;
private Handler<Void> endHandler;
private String queueName;
private final QueueConsumerHandler consumerHandler;
private final boolean keepMostRecent;
private final InboundBuffer<RabbitMQMessage> pending;
private final int maxQueueSize;
private volatile boolean cancelled;
RabbitMQConsumerImpl(Context context, QueueConsumerHandler consumerHandler, QueueOptions options, String queueName) {
this.consumerHandler = consumerHandler;
this.keepMostRecent = options.isKeepMostRecent();
this.maxQueueSize = options.maxInternalQueueSize();
this.pending = new InboundBuffer<RabbitMQMessage>(context, maxQueueSize).pause();
this.queueName = queueName;
}
@Override
public String queueName() {
return queueName;
}
@Override
public RabbitMQConsumer setQueueName(String name) {
this.queueName = name;
return this;
}
@Override
public RabbitMQConsumer exceptionHandler(Handler<Throwable> exceptionHandler) {
this.exceptionHandler = exceptionHandler;
return this;
}
@Override
public RabbitMQConsumer handler(Handler<RabbitMQMessage> handler) {
if (handler != null) {
pending.handler(msg -> {
try {
handler.handle(msg);
} catch (Exception e) {
handleException(e);
}
});
} else {
pending.handler(null);
}
return this;
}
@Override
public RabbitMQConsumer pause() {
pending.pause();
return this;
}
@Override
public RabbitMQConsumer resume() {
pending.resume();
return this;
}
@Override
public RabbitMQConsumer fetch(long amount) {
pending.fetch(amount);
return this;
}
@Override
public RabbitMQConsumer endHandler(Handler<Void> endHandler) {
this.endHandler = endHandler;
return this;
}
@Override
public String consumerTag() {
return consumerHandler.getConsumerTag();
}
@Override
public Future<Void> cancel() {
Promise<Void> promise = Promise.promise();
cancel(promise);
return promise.future();
}
@Override
public void cancel(Handler<AsyncResult<Void>> cancelResult) {
AsyncResult<Void> operationResult;
try {
log.debug("Cancelling " + consumerTag());
cancelled = true;
consumerHandler.getChannel().basicCancel(consumerTag());
operationResult = Future.succeededFuture();
} catch (IOException e) {
operationResult = Future.failedFuture(e);
}
if (cancelResult != null) {
cancelResult.handle(operationResult);
}
handleEnd();
}
@Override
public boolean isCancelled() {
return cancelled;
}
@Override
public boolean isPaused() {
return false;
}
Push message to stream.
Should be called from a vertx thread.
Params: - message – received message to deliver
/**
* Push message to stream.
* <p>
* Should be called from a vertx thread.
*
* @param message received message to deliver
*/
void handleMessage(RabbitMQMessage message) {
if (pending.size() >= maxQueueSize) {
if (keepMostRecent) {
pending.read();
} else {
log.debug("Discard a received message since stream is paused and buffer flag is false");
return;
}
}
pending.write(message);
}
Trigger exception handler with given exception
/**
* Trigger exception handler with given exception
*/
private void handleException(Throwable exception) {
if (exceptionHandler != null) {
exceptionHandler.handle(exception);
}
}
Trigger end of stream handler
/**
* Trigger end of stream handler
*/
void handleEnd() {
if (endHandler != null) {
endHandler.handle(null);
}
}
}