package io.vertx.proton.impl;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonTransportOptions;
import io.vertx.proton.sasl.ProtonSaslAuthenticator;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Transport;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class ProtonServerImpl implements ProtonServer {
private final Vertx vertx;
private final NetServer server;
private Handler<ProtonConnection> handler;
private ProtonSaslAuthenticatorFactory authenticatorFactory = new DefaultAuthenticatorFactory();
private boolean advertiseAnonymousRelayCapability = true;
private ProtonServerOptions options;
public ProtonServerImpl(Vertx vertx) {
this.vertx = vertx;
this.server = this.vertx.createNetServer();
this.options = new ProtonServerOptions();
}
public ProtonServerImpl(Vertx vertx, ProtonServerOptions options) {
this.vertx = vertx;
this.server = this.vertx.createNetServer(options);
this.options = options;
}
@Override
public int actualPort() {
return server.actualPort();
}
@Override
public ProtonServerImpl listen(int i) {
server.listen(i);
return this;
}
@Override
public ProtonServerImpl listen() {
server.listen();
return this;
}
public boolean isMetricsEnabled() {
return server.isMetricsEnabled();
}
@Override
public ProtonServerImpl listen(int port, String host, Handler<AsyncResult<ProtonServer>> handler) {
server.listen(port, host, convertHandler(handler));
return this;
}
@Override
public ProtonServerImpl listen(Handler<AsyncResult<ProtonServer>> handler) {
server.listen(convertHandler(handler));
return this;
}
private Handler<AsyncResult<NetServer>> convertHandler(final Handler<AsyncResult<ProtonServer>> handler) {
return result -> {
if (result.succeeded()) {
handler.handle(Future.succeededFuture(ProtonServerImpl.this));
} else {
handler.handle(Future.failedFuture(result.cause()));
}
};
}
@Override
public ProtonServerImpl listen(int i, String s) {
server.listen(i, s);
return this;
}
@Override
public ProtonServerImpl listen(int i, Handler<AsyncResult<ProtonServer>> handler) {
server.listen(i, convertHandler(handler));
return this;
}
@Override
public void close() {
server.close();
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
server.close(handler);
}
@Override
public Handler<ProtonConnection> connectHandler() {
return handler;
}
@Override
public ProtonServer saslAuthenticatorFactory(ProtonSaslAuthenticatorFactory authenticatorFactory) {
if (authenticatorFactory == null) {
this.authenticatorFactory = new DefaultAuthenticatorFactory();
} else {
this.authenticatorFactory = authenticatorFactory;
}
return this;
}
@Override
public ProtonServerImpl connectHandler(Handler<ProtonConnection> handler) {
this.handler = handler;
server.connectHandler(netSocket -> {
String hostname = null;
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
}
final ProtonConnectionImpl connection = new ProtonConnectionImpl(vertx, hostname, (ContextInternal) Vertx.currentContext());
if (advertiseAnonymousRelayCapability) {
connection.setOfferedCapabilities(new Symbol[] { ProtonConnectionImpl.ANONYMOUS_RELAY });
}
final ProtonSaslAuthenticator authenticator = authenticatorFactory.create();
ProtonTransportOptions transportOptions = new ProtonTransportOptions();
transportOptions.setHeartbeat(this.options.getHeartbeat());
transportOptions.setMaxFrameSize(this.options.getMaxFrameSize());
connection.bindServer(netSocket, new ProtonSaslAuthenticator() {
@Override
public void init(NetSocket socket, ProtonConnection protonConnection, Transport transport) {
authenticator.init(socket, protonConnection, transport);
}
@Override
public void process(Handler<Boolean> completionHandler) {
final Context context = Vertx.currentContext();
authenticator.process(complete -> {
final Context callbackContext = vertx.getOrCreateContext();
if(context != callbackContext) {
throw new IllegalStateException("Callback was not made on the original context");
}
if (complete) {
if (succeeded()) {
handler.handle(connection);
connection.flush();
} else {
connection.flush();
connection.disconnect();
}
}
completionHandler.handle(complete);
});
}
@Override
public boolean succeeded() {
return authenticator.succeeded();
}
}, transportOptions);
});
return this;
}
public void setAdvertiseAnonymousRelayCapability(boolean advertiseAnonymousRelayCapability) {
this.advertiseAnonymousRelayCapability = advertiseAnonymousRelayCapability;
}
private static class DefaultAuthenticatorFactory implements ProtonSaslAuthenticatorFactory {
@Override
public ProtonSaslAuthenticator create() {
return new ProtonSaslServerAuthenticatorImpl();
}
}
}