package io.vertx.core.eventbus.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.*;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
public class MessageProducerImpl<T> implements MessageProducer<T> {
private final Vertx vertx;
private final EventBusImpl bus;
private final boolean send;
private final String address;
private DeliveryOptions options;
public MessageProducerImpl(Vertx vertx, String address, boolean send, DeliveryOptions options) {
this.vertx = vertx;
this.bus = (EventBusImpl) vertx.eventBus();
this.address = address;
this.send = send;
this.options = options;
}
@Override
public synchronized MessageProducer<T> deliveryOptions(DeliveryOptions options) {
this.options = options;
return this;
}
@Override
public Future<Void> write(T body) {
Promise<Void> promise = ((VertxInternal)vertx).getOrCreateContext().promise();
write(body, promise);
return promise.future();
}
@Override
public void write(T body, Handler<AsyncResult<Void>> handler) {
Promise<Void> promise = null;
if (handler != null) {
promise = ((VertxInternal)vertx).getOrCreateContext().promise(handler);
}
write(body, promise);
}
private void write(T data, Promise<Void> handler) {
MessageImpl msg = bus.createMessage(send, address, options.getHeaders(), data, options.getCodecName());
bus.sendOrPubInternal(msg, options, null, handler);
}
@Override
public String address() {
return address;
}
@Override
public Future<Void> close() {
return ((ContextInternal)vertx.getOrCreateContext()).succeededFuture();
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
Future<Void> fut = close();
if (handler != null) {
fut.onComplete(handler);
}
}
}