package io.vertx.core.http.impl;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.net.impl.clientconnection.ConnectResult;
import io.vertx.core.net.impl.clientconnection.ConnectionListener;
import io.vertx.core.net.impl.clientconnection.ConnectionProvider;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ChannelProvider;
import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
public class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
private final HttpClientImpl client;
private final ChannelGroup channelGroup;
private final ContextInternal context;
private final HttpClientOptions options;
private final ClientMetrics metrics;
private final SSLHelper sslHelper;
private final HttpVersion version;
private final long weight;
private final long http1Weight;
private final long http2Weight;
private final long http1MaxConcurrency;
private final SocketAddress peerAddress;
private final SocketAddress server;
public HttpChannelConnector(HttpClientImpl client,
ChannelGroup channelGroup,
ContextInternal context,
ClientMetrics metrics,
HttpVersion version,
SSLHelper sslHelper,
SocketAddress peerAddress,
SocketAddress server) {
this.client = client;
this.channelGroup = channelGroup;
this.context = context;
this.metrics = metrics;
this.options = client.getOptions();
this.sslHelper = sslHelper;
this.version = version;
this.http1Weight = options.getHttp2MaxPoolSize();
this.http2Weight = options.getMaxPoolSize();
this.weight = version == HttpVersion.HTTP_2 ? http2Weight : http1Weight;
this.http1MaxConcurrency = options.isPipelining() ? options.getPipeliningLimit() : 1;
this.peerAddress = peerAddress;
this.server = server;
}
public long weight() {
return weight;
}
@Override
public void close(HttpClientConnection conn) {
conn.close();
}
@Override
public void init(HttpClientConnection conn) {
Handler<HttpConnection> handler = client.connectionHandler();
if (handler != null) {
context.emit(conn, handler);
}
}
@Override
public boolean isValid(HttpClientConnection conn) {
return conn.isValid();
}
@Override
public void connect(ConnectionListener<HttpClientConnection> listener, ContextInternal context, Handler<AsyncResult<ConnectResult<HttpClientConnection>>> handler) {
Promise<ConnectResult<HttpClientConnection>> promise = Promise.promise();
promise.future().onComplete(handler);
try {
doConnect(listener, (EventLoopContext) context, promise);
} catch(Exception e) {
promise.tryFail(e);
}
}
private void doConnect(
ConnectionListener<HttpClientConnection> listener,
EventLoopContext context,
Promise<ConnectResult<HttpClientConnection>> future) {
boolean domainSocket = server.path() != null;
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.nettyEventLoop());
applyConnectionOptions(domainSocket, bootstrap);
ProxyOptions options = this.options.getProxyOptions();
if (options != null && sslHelper == null && options.getType()== ProxyType.HTTP) {
options = null;
}
ChannelProvider channelProvider = new ChannelProvider(bootstrap, sslHelper, context, options);
Future<Channel> fut = channelProvider.connect(server, peerAddress, this.options.isForceSni() ? peerAddress.host() : null, sslHelper != null);
fut.addListener((GenericFutureListener<Future<Channel>>) res -> {
if (res.isSuccess()) {
Channel ch = res.getNow();
channelGroup.add(ch);
if (sslHelper != null) {
String protocol = channelProvider.applicationProtocol();
if (sslHelper.isUseAlpn()) {
if ("h2".equals(protocol)) {
applyHttp2ConnectionOptions(ch.pipeline());
http2Connected(listener, context, ch, future);
} else {
applyHttp1xConnectionOptions(ch.pipeline());
HttpVersion fallbackProtocol = "http/1.0".equals(protocol) ?
HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
http1xConnected(listener, fallbackProtocol, server, true, context, ch, http1Weight, future);
}
} else {
applyHttp1xConnectionOptions(ch.pipeline());
http1xConnected(listener, version, server, true, context, ch, http1Weight, future);
}
} else {
ChannelPipeline pipeline = ch.pipeline();
if (version == HttpVersion.HTTP_2) {
if (this.options.isHttp2ClearTextUpgrade()) {
applyHttp1xConnectionOptions(pipeline);
http1xConnected(listener, version, server, false, context, ch, http2Weight, future);
} else {
applyHttp2ConnectionOptions(pipeline);
http2Connected(listener, context, ch, future);
}
} else {
applyHttp1xConnectionOptions(pipeline);
http1xConnected(listener, version, server, false, context, ch, http1Weight, future);
}
}
} else {
connectFailed(channelProvider.channel(), listener, res.cause(), future);
}
});
}
private void applyConnectionOptions(boolean domainSocket, Bootstrap bootstrap) {
client.getVertx().transport().configure(options, domainSocket, bootstrap);
}
private void applyHttp2ConnectionOptions(ChannelPipeline pipeline) {
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
}
}
private void applyHttp1xConnectionOptions(ChannelPipeline pipeline) {
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
}
if (options.getLogActivity()) {
pipeline.addLast("logging", new LoggingHandler());
}
pipeline.addLast("codec", new HttpClientCodec(
options.getMaxInitialLineLength(),
options.getMaxHeaderSize(),
options.getMaxChunkSize(),
false,
false,
options.getDecoderInitialBufferSize()));
if (options.isTryUseCompression()) {
pipeline.addLast("inflater", new HttpContentDecompressor(false));
}
}
private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
HttpVersion version,
SocketAddress server,
boolean ssl,
ContextInternal context,
Channel ch, long weight,
Promise<ConnectResult<HttpClientConnection>> future) {
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(chctx -> {
HttpClientMetrics met = client.metrics();
Http1xClientConnection conn = new Http1xClientConnection(listener, upgrade ? HttpVersion.HTTP_1_1 : version, client, chctx, ssl, server, context, this.metrics);
if (met != null) {
Object socketMetric = met.connected(conn.remoteAddress(), conn.remoteName());
conn.metric(socketMetric);
met.endpointConnected(metrics);
}
return conn;
});
clientHandler.addHandler(conn -> {
if (upgrade) {
future.complete(new ConnectResult<>(new Http2UpgradedClientConnection(client, conn), 1, http2Weight));
} else {
future.complete(new ConnectResult<>(conn, http1MaxConcurrency, http1Weight));
}
});
clientHandler.removeHandler(conn -> {
listener.onEvict();
});
ch.pipeline().addLast("handler", clientHandler);
}
private void http2Connected(ConnectionListener<HttpClientConnection> listener,
EventLoopContext context,
Channel ch,
Promise<ConnectResult<HttpClientConnection>> future) {
try {
VertxHttp2ConnectionHandler<Http2ClientConnection> clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, metrics, listener, context, null, (conn, concurrency) -> {
future.complete(new ConnectResult<>(conn, concurrency, http2Weight));
});
ch.pipeline().addLast("handler", clientHandler);
ch.flush();
} catch (Exception e) {
connectFailed(ch, listener, e, future);
}
}
private void connectFailed(Channel ch, ConnectionListener<HttpClientConnection> listener, Throwable t, Promise<ConnectResult<HttpClientConnection>> future) {
if (ch != null) {
try {
ch.close();
} catch (Exception ignore) {
}
}
future.tryFail(t);
}
}