package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.*;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.UUID;
public class MessageProducerImpl<T> implements MessageProducer<T> {
public static final String = "__vertx.credit";
private final Vertx vertx;
private final EventBusImpl bus;
private final boolean send;
private final String address;
private final Queue<MessageImpl<T, ?>> pending = new ArrayDeque<>();
private final MessageConsumer<Integer> creditConsumer;
private DeliveryOptions options;
private int maxSize = DEFAULT_WRITE_QUEUE_MAX_SIZE;
private int credits = DEFAULT_WRITE_QUEUE_MAX_SIZE;
private Handler<Void> drainHandler;
public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
this.vertx = vertx;
this.bus = (EventBusImpl) vertx.eventBus();
this.address = address;
this.send = send;
this.options = options;
if (send) {
String creditAddress = UUID.randomUUID().toString() + "-credit";
creditConsumer = bus.consumer(creditAddress, msg -> {
doReceiveCredit(msg.body());
});
options.addHeader(CREDIT_ADDRESS_HEADER_NAME, creditAddress);
} else {
creditConsumer = null;
}
}
@Override
public synchronized MessageProducer<T> deliveryOptions(DeliveryOptions options) {
if (creditConsumer != null) {
options = new DeliveryOptions(options);
options.addHeader(CREDIT_ADDRESS_HEADER_NAME, this.options.getHeaders().get(CREDIT_ADDRESS_HEADER_NAME));
}
this.options = options;
return this;
}
@Override
public MessageProducer<T> send(T message) {
doSend(message, null, null);
return this;
}
@Override
public <R> MessageProducer<T> send(T message, Handler<AsyncResult<Message<R>>> replyHandler) {
doSend(message, replyHandler, null);
return this;
}
@Override
public MessageProducer<T> exceptionHandler(Handler<Throwable> handler) {
return this;
}
@Override
public synchronized MessageProducer<T> setWriteQueueMaxSize(int s) {
int delta = s - maxSize;
maxSize = s;
credits += delta;
return this;
}
@Override
public synchronized MessageProducer<T> write(T data) {
return write(data, null);
}
@Override
public MessageProducer<T> write(T data, Handler<AsyncResult<Void>> handler) {
if (send) {
doSend(data, null, handler);
} else {
MessageImpl msg = bus.createMessage(false, address, options.getHeaders(), data, options.getCodecName(), handler);
msg.writeHandler = handler;
bus.sendOrPubInternal(msg, options, null);
}
return this;
}
@Override
public synchronized boolean writeQueueFull() {
return credits == 0;
}
@Override
public synchronized MessageProducer<T> drainHandler(Handler<Void> handler) {
this.drainHandler = handler;
if (handler != null) {
checkDrained();
}
return this;
}
private void checkDrained() {
Handler<Void> handler = drainHandler;
if (handler != null && credits >= maxSize / 2) {
this.drainHandler = null;
vertx.runOnContext(v -> handler.handle(null));
}
}
@Override
public String address() {
return address;
}
@Override
public void end() {
close();
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
close(null);
}
@Override
public void close() {
close(null);
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
if (creditConsumer != null) {
creditConsumer.unregister(handler);
} else {
vertx.runOnContext(v -> {
if (handler != null) {
handler.handle(Future.succeededFuture());
}
});
}
}
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
private synchronized <R> void doSend(T data, Handler<AsyncResult<Message<R>>> replyHandler, Handler<AsyncResult<Void>> handler) {
MessageImpl msg = bus.createMessage(true, address, options.getHeaders(), data, options.getCodecName(), handler);
if (credits > 0) {
credits--;
bus.sendOrPubInternal(msg, options, replyHandler);
} else {
pending.add(msg);
}
}
private synchronized void doReceiveCredit(int credit) {
credits += credit;
while (credits > 0) {
MessageImpl<T, ?> msg = pending.poll();
if (msg == null) {
break;
} else {
credits--;
bus.sendOrPubInternal(msg, options, null);
}
}
checkDrained();
}
}