package io.vertx.core.net.impl;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoop;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.metrics.TCPMetrics;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
public abstract class TCPServerBase implements Closeable, MetricsProvider {
private static final Logger log = LoggerFactory.getLogger(NetServerImpl.class);
protected final Context creatingContext;
protected final VertxInternal vertx;
protected final NetServerOptions options;
protected final SSLHelper sslHelper;
private EventLoop eventLoop;
private Handler<Channel> worker;
private volatile boolean listening;
private ContextInternal listenContext;
private ServerID id;
private TCPServerBase actualServer;
private ServerChannelLoadBalancer channelBalancer;
private io.netty.util.concurrent.Future<Channel> bindFuture;
private Set<TCPServerBase> servers;
private TCPMetrics<?> metrics;
private volatile int actualPort;
public TCPServerBase(VertxInternal vertx, NetServerOptions options) {
this.vertx = vertx;
this.options = new NetServerOptions(options);
this.sslHelper = new SSLHelper(options, options.getKeyCertOptions(), options.getTrustOptions());
this.creatingContext = vertx.getContext();
}
public int actualPort() {
TCPServerBase server = actualServer;
return server != null ? server.actualPort : actualPort;
}
public synchronized io.netty.util.concurrent.Future<Channel> listen(SocketAddress localAddress, ContextInternal context, Handler<Channel> worker) {
if (listening) {
throw new IllegalStateException("Listen already called");
}
this.listenContext = context;
this.listening = true;
this.eventLoop = context.nettyEventLoop();
this.worker = worker;
Map<ServerID, TCPServerBase> sharedNetServers = vertx.sharedTCPServers((Class<TCPServerBase>) getClass());
synchronized (sharedNetServers) {
actualPort = localAddress.port();
String hostOrPath = localAddress.isInetSocket() ? localAddress.host() : localAddress.path();
TCPServerBase main;
boolean shared;
if (actualPort != 0) {
id = new ServerID(actualPort, hostOrPath);
main = sharedNetServers.get(id);
shared = true;
} else {
if (creatingContext != null && creatingContext.deploymentID() != null) {
id = new ServerID(actualPort, hostOrPath + "/" + creatingContext.deploymentID());
main = sharedNetServers.get(id);
shared = true;
} else {
id = new ServerID(actualPort, hostOrPath);
main = null;
shared = false;
}
}
if (main == null) {
servers = new HashSet<>();
servers.add(this);
channelBalancer = new ServerChannelLoadBalancer(vertx.getAcceptorEventLoopGroup().next());
channelBalancer.addWorker(eventLoop, worker);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(vertx.getAcceptorEventLoopGroup(), channelBalancer.workers());
bootstrap.childHandler(channelBalancer);
applyConnectionOptions(localAddress.isDomainSocket(), bootstrap);
try {
sslHelper.validate(vertx);
bindFuture = AsyncResolveConnectHelper.doBind(vertx, localAddress, bootstrap);
bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) res -> {
if (res.isSuccess()) {
Channel ch = res.getNow();
log.trace("Net server listening on " + hostOrPath + ":" + ch.localAddress());
if (actualPort != -1) {
actualPort = ((InetSocketAddress)ch.localAddress()).getPort();
}
id = new ServerID(TCPServerBase.this.actualPort, id.host);
listenContext.addCloseHook(this);
metrics = createMetrics(localAddress);
} else {
if (shared) {
synchronized (sharedNetServers) {
sharedNetServers.remove(id);
}
}
listening = false;
}
});
} catch (Throwable t) {
listening = false;
return vertx.getAcceptorEventLoopGroup().next().newFailedFuture(t);
}
if (shared) {
sharedNetServers.put(id, this);
}
actualServer = this;
} else {
actualServer = main;
actualServer.servers.add(this);
actualServer.channelBalancer.addWorker(eventLoop, worker);
metrics = main.metrics;
listenContext.addCloseHook(this);
}
}
return actualServer.bindFuture;
}
public boolean isListening() {
return listening;
}
protected TCPMetrics<?> createMetrics(SocketAddress localAddress) {
return null;
}
private void applyConnectionOptions(boolean domainSocket, ServerBootstrap bootstrap) {
vertx.transport().configure(options, domainSocket, bootstrap);
}
@Override
public boolean isMetricsEnabled() {
return metrics != null;
}
@Override
public synchronized TCPMetrics<?> getMetrics() {
return actualServer != null ? actualServer.metrics : null;
}
@Override
public synchronized void close(Promise<Void> completion) {
if (!listening) {
completion.complete();
return;
}
listening = false;
listenContext.removeCloseHook(this);
Map<ServerID, TCPServerBase> servers = vertx.sharedTCPServers((Class<TCPServerBase>) getClass());
synchronized (servers) {
ServerChannelLoadBalancer balancer = actualServer.channelBalancer;
balancer.removeWorker(eventLoop, worker);
if (balancer.hasHandlers()) {
completion.complete();
} else {
servers.remove(id);
actualServer.actualClose(completion);
}
}
}
private void actualClose(Promise<Void> done) {
channelBalancer.close();
bindFuture.addListener((GenericFutureListener<io.netty.util.concurrent.Future<Channel>>) fut -> {
if (fut.isSuccess()) {
Channel channel = fut.getNow();
ChannelFuture a = channel.close();
if (metrics != null) {
a.addListener(cg -> metrics.close());
}
a.addListener((PromiseInternal<Void>)done);
} else {
done.complete();
}
});
}
public void closeAll(Handler<AsyncResult<Void>> handler) {
List<Future> futures = new ArrayList<>(actualServer.servers)
.stream()
.map(TCPServerBase::close)
.collect(Collectors.toList());
CompositeFuture fut = CompositeFuture.all(futures);
fut.onComplete(ar -> handler.handle(ar.mapEmpty()));
}
public abstract Future<Void> close();
@Override
protected void finalize() throws Throwable {
close();
super.finalize();
}
}