package io.vertx.proton.impl;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonClientOptions;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonTransportOptions;
public class ProtonClientImpl implements ProtonClient {
private static final Logger LOG = LoggerFactory.getLogger(ProtonClientImpl.class);
private final Vertx vertx;
public ProtonClientImpl(Vertx vertx) {
this.vertx = vertx;
}
public void connect(String host, int port, Handler<AsyncResult<ProtonConnection>> handler) {
connect(host, port, null, null, handler);
}
public void connect(String host, int port, String username, String password,
Handler<AsyncResult<ProtonConnection>> handler) {
connect(new ProtonClientOptions(), host, port, username, password, handler);
}
public void connect(ProtonClientOptions options, String host, int port,
Handler<AsyncResult<ProtonConnection>> handler) {
connect(options, host, port, null, null, handler);
}
public void connect(ProtonClientOptions options, String host, int port, String username, String password,
Handler<AsyncResult<ProtonConnection>> handler) {
final NetClient netClient = vertx.createNetClient(options);
connectNetClient(netClient, host, port, username, password, new ConnectCompletionHandler(handler, netClient), options);
}
private void connectNetClient(NetClient netClient, String host, int port, String username, String password,
ConnectCompletionHandler connectHandler, ProtonClientOptions options) {
String serverName = options.getSniServerName() != null ? options.getSniServerName() :
(options.getVirtualHost() != null ? options.getVirtualHost() : null);
netClient.connect(port, host, serverName, res -> {
if (res.succeeded()) {
String virtualHost = options.getVirtualHost() != null ? options.getVirtualHost() : host;
ProtonConnectionImpl conn = new ProtonConnectionImpl(vertx, virtualHost, (ContextInternal) Vertx.currentContext());
conn.disconnectHandler(h -> {
LOG.trace("Connection disconnected");
if(!connectHandler.isComplete()) {
connectHandler.handle(Future.failedFuture(new VertxException("Disconnected")));
}
});
ProtonSaslClientAuthenticatorImpl authenticator = new ProtonSaslClientAuthenticatorImpl(username, password,
options.getEnabledSaslMechanisms(), connectHandler);
ProtonTransportOptions transportOptions = new ProtonTransportOptions();
transportOptions.setHeartbeat(options.getHeartbeat());
transportOptions.setMaxFrameSize(options.getMaxFrameSize());
conn.bindClient(netClient, res.result(), authenticator, transportOptions);
conn.flush();
} else {
connectHandler.handle(Future.failedFuture(res.cause()));
}
});
}
static class ConnectCompletionHandler implements Handler<AsyncResult<ProtonConnection>> {
private AtomicBoolean completed = new AtomicBoolean();
private Handler<AsyncResult<ProtonConnection>> applicationConnectHandler;
private NetClient netClient;
ConnectCompletionHandler(Handler<AsyncResult<ProtonConnection>> applicationConnectHandler, NetClient netClient) {
this.applicationConnectHandler = Objects.requireNonNull(applicationConnectHandler);
this.netClient = Objects.requireNonNull(netClient);
}
public boolean isComplete() {
return completed.get();
}
@Override
public void handle(AsyncResult<ProtonConnection> event) {
if (completed.compareAndSet(false, true)) {
if (event.failed()) {
netClient.close();
}
applicationConnectHandler.handle(event);
}
}
}
}