package io.vertx.rabbitmq.impl;

import io.vertx.rabbitmq.RabbitMQConfirmation;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.impl.InboundBuffer;

Author:jtalbut
/** * * @author jtalbut */
public class RabbitMQConfirmListenerImpl implements ReadStream<RabbitMQConfirmation> { private final RabbitMQClientImpl client; private final InboundBuffer<RabbitMQConfirmation> pending; private final int maxQueueSize; private Handler<Throwable> exceptionHandler; public RabbitMQConfirmListenerImpl(RabbitMQClientImpl client, Context context, int maxQueueSize) { this.client = client; this.maxQueueSize = maxQueueSize; this.pending = new InboundBuffer<>(context, maxQueueSize); } void handleAck(long deliveryTag, boolean multiple, boolean succeeded) { if (pending.size() >= maxQueueSize) { pending.read(); } pending.write(new RabbitMQConfirmation(client.getChannelInstance(), deliveryTag, multiple, succeeded)); } @Override public RabbitMQConfirmListenerImpl exceptionHandler(Handler<Throwable> exceptionHandler) { this.exceptionHandler = exceptionHandler; return this; } @Override public RabbitMQConfirmListenerImpl handler(Handler<RabbitMQConfirmation> handler) { if (handler != null) { pending.handler(msg -> { try { handler.handle(msg); } catch (Exception e) { handleException(e); } }); } else { pending.handler(null); } return this; }
Trigger exception handler with given exception
/** * Trigger exception handler with given exception */
private void handleException(Throwable exception) { if (exceptionHandler != null) { exceptionHandler.handle(exception); } } @Override public RabbitMQConfirmListenerImpl pause() { pending.pause(); return this; } @Override public RabbitMQConfirmListenerImpl resume() { pending.resume(); return this; } @Override public RabbitMQConfirmListenerImpl fetch(long amount) { pending.fetch(amount); return this; } @Override public RabbitMQConfirmListenerImpl endHandler(Handler<Void> hndlr) { return this; } }