package io.vertx.amqp.impl;
import io.vertx.amqp.*;
import io.vertx.core.*;
import io.vertx.proton.ProtonClient;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
public class AmqpClientImpl implements AmqpClient {
private final Vertx vertx;
private final ProtonClient proton;
private final AmqpClientOptions options;
private final List<AmqpConnection> connections = new CopyOnWriteArrayList<>();
private final boolean mustCloseVertxOnClose;
public AmqpClientImpl(Vertx vertx, AmqpClientOptions options, boolean mustCloseVertxOnClose) {
this.vertx = vertx;
if (options == null) {
this.options = new AmqpClientOptions();
} else {
this.options = options;
}
this.proton = ProtonClient.create(vertx);
this.mustCloseVertxOnClose = mustCloseVertxOnClose;
}
@Override
public AmqpClient connect(Handler<AsyncResult<AmqpConnection>> connectionHandler) {
Objects.requireNonNull(options.getHost(), "Host must be set");
Objects.requireNonNull(connectionHandler, "Handler must not be null");
new AmqpConnectionImpl(vertx.getOrCreateContext(), this, options, proton, connectionHandler);
return this;
}
@Override
public Future<AmqpConnection> connect() {
Promise<AmqpConnection> promise = Promise.promise();
connect(promise);
return promise.future();
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
List<Future> actions = new ArrayList<>();
for (AmqpConnection connection : connections) {
Promise<Void> future = Promise.promise();
connection.close(future);
actions.add(future.future());
}
CompositeFuture.join(actions).onComplete(done -> {
connections.clear();
if (mustCloseVertxOnClose) {
vertx.close(x -> {
if (done.succeeded() && x.succeeded()) {
if (handler != null) {
handler.handle(Future.succeededFuture());
}
} else {
if (handler != null) {
handler.handle(Future.failedFuture(done.failed() ? done.cause() : x.cause()));
}
}
});
} else if (handler != null) {
handler.handle(done.mapEmpty());
}
});
}
@Override
public Future<Void> close() {
Promise<Void> promise = Promise.promise();
close(promise);
return promise.future();
}
@Override
public AmqpClient createReceiver(String address,
Handler<AsyncResult<AmqpReceiver>> completionHandler) {
return connect(res -> {
if (res.failed()) {
completionHandler.handle(res.mapEmpty());
} else {
res.result().createReceiver(address, completionHandler);
}
});
}
@Override
public Future<AmqpReceiver> createReceiver(String address) {
Promise<AmqpReceiver> promise = Promise.promise();
createReceiver(address, promise);
return promise.future();
}
@Override
public AmqpClient createReceiver(String address, AmqpReceiverOptions receiverOptions, Handler<AsyncResult<AmqpReceiver>> completionHandler) {
return connect(res -> {
if (res.failed()) {
completionHandler.handle(res.mapEmpty());
} else {
res.result().createReceiver(address, receiverOptions, completionHandler);
}
});
}
@Override
public Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) {
Promise<AmqpReceiver> promise = Promise.promise();
createReceiver(address, receiverOptions, promise);
return promise.future();
}
@Override
public AmqpClient createSender(String address, Handler<AsyncResult<AmqpSender>> completionHandler) {
return connect(res -> {
if (res.failed()) {
completionHandler.handle(res.mapEmpty());
} else {
res.result().createSender(address, completionHandler);
}
});
}
@Override
public Future<AmqpSender> createSender(String address) {
Promise<AmqpSender> promise = Promise.promise();
createSender(address, promise);
return promise.future();
}
@Override
public AmqpClient createSender(String address, AmqpSenderOptions options,
Handler<AsyncResult<AmqpSender>> completionHandler) {
return connect(res -> {
if (res.failed()) {
completionHandler.handle(res.mapEmpty());
} else {
res.result().createSender(address, options, completionHandler);
}
});
}
@Override
public Future<AmqpSender> createSender(String address, AmqpSenderOptions options) {
Promise<AmqpSender> promise = Promise.promise();
createSender(address, options, promise);
return promise.future();
}
synchronized void register(AmqpConnectionImpl connection) {
connections.add(connection);
}
}