package io.vertx.core.http.impl;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.*;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.ConnectResult;
import io.vertx.core.http.impl.pool.ConnectionListener;
import io.vertx.core.http.impl.pool.ConnectionProvider;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ChannelProvider;
import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.HttpClientMetrics;
class HttpChannelConnector implements ConnectionProvider<HttpClientConnection> {
private final HttpClientImpl client;
private final HttpClientOptions options;
private final HttpClientMetrics metrics;
private final SSLHelper sslHelper;
private final HttpVersion version;
private final long weight;
private final long http1Weight;
private final long http2Weight;
private final long http1MaxConcurrency;
private final boolean ssl;
private final SocketAddress peerAddress;
private final SocketAddress server;
private final Object endpointMetric;
HttpChannelConnector(HttpClientImpl client,
Object endpointMetric,
HttpVersion version,
boolean ssl,
SocketAddress peerAddress,
SocketAddress server) {
this.client = client;
this.endpointMetric = endpointMetric;
this.options = client.getOptions();
this.metrics = client.metrics();
this.sslHelper = client.getSslHelper();
this.version = version;
this.http1Weight = options.getHttp2MaxPoolSize();
this.http2Weight = options.getMaxPoolSize();
this.weight = version == HttpVersion.HTTP_2 ? http2Weight : http1Weight;
this.http1MaxConcurrency = options.isPipelining() ? options.getPipeliningLimit() : 1;
this.ssl = ssl;
this.peerAddress = peerAddress;
this.server = server;
}
public long weight() {
return weight;
}
@Override
public void close(HttpClientConnection conn) {
conn.close();
}
@Override
public void connect(ConnectionListener<HttpClientConnection> listener, ContextInternal context, Handler<AsyncResult<ConnectResult<HttpClientConnection>>> handler) {
Promise<ConnectResult<HttpClientConnection>> promise = Promise.promise();
promise.future().setHandler(handler);
try {
doConnect(listener, context, promise);
} catch(Exception e) {
promise.tryFail(e);
}
}
private void doConnect(
ConnectionListener<HttpClientConnection> listener,
ContextInternal context,
Promise<ConnectResult<HttpClientConnection>> future) {
boolean domainSocket = server.path() != null;
boolean useAlpn = options.isUseAlpn();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(context.nettyEventLoop());
bootstrap.channelFactory(client.getVertx().transport().channelFactory(domainSocket));
applyConnectionOptions(domainSocket, bootstrap);
ProxyOptions options = this.options.getProxyOptions();
if (options != null && !ssl && options.getType()== ProxyType.HTTP) {
options = null;
}
ChannelProvider channelProvider = new ChannelProvider(bootstrap, sslHelper, context, options);
Handler<AsyncResult<Channel>> channelHandler = res -> {
if (res.succeeded()) {
Channel ch = res.result();
if (ssl) {
String protocol = channelProvider.applicationProtocol();
if (useAlpn) {
if ("h2".equals(protocol)) {
applyHttp2ConnectionOptions(ch.pipeline());
http2Connected(listener, context, ch, future);
} else {
applyHttp1xConnectionOptions(ch.pipeline());
HttpVersion fallbackProtocol = "http/1.0".equals(protocol) ?
HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
http1xConnected(listener, fallbackProtocol, server, true, context, ch, http1Weight, future);
}
} else {
applyHttp1xConnectionOptions(ch.pipeline());
http1xConnected(listener, version, server, true, context, ch, http1Weight, future);
}
} else {
ChannelPipeline pipeline = ch.pipeline();
if (version == HttpVersion.HTTP_2) {
if (this.options.isHttp2ClearTextUpgrade()) {
applyHttp1xConnectionOptions(pipeline);
http1xConnected(listener, version, server, false, context, ch, http2Weight, future);
} else {
applyHttp2ConnectionOptions(pipeline);
http2Connected(listener, context, ch, future);
}
} else {
applyHttp1xConnectionOptions(pipeline);
http1xConnected(listener, version, server, false, context, ch, http1Weight, future);
}
}
} else {
connectFailed(channelProvider.channel(), listener, res.cause(), future);
}
};
channelProvider.connect(server, peerAddress, this.options.isForceSni() ? peerAddress.host() : null, ssl, channelHandler);
}
private void applyConnectionOptions(boolean domainSocket, Bootstrap bootstrap) {
client.getVertx().transport().configure(options, domainSocket, bootstrap);
}
private void applyHttp2ConnectionOptions(ChannelPipeline pipeline) {
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
}
}
private void applyHttp1xConnectionOptions(ChannelPipeline pipeline) {
if (options.getIdleTimeout() > 0) {
pipeline.addLast("idle", new IdleStateHandler(0, 0, options.getIdleTimeout(), options.getIdleTimeoutUnit()));
}
if (options.getLogActivity()) {
pipeline.addLast("logging", new LoggingHandler());
}
pipeline.addLast("codec", new HttpClientCodec(
options.getMaxInitialLineLength(),
options.getMaxHeaderSize(),
options.getMaxChunkSize(),
false,
false,
options.getDecoderInitialBufferSize()));
if (options.isTryUseCompression()) {
pipeline.addLast("inflater", new HttpContentDecompressor(false));
}
}
private void http1xConnected(ConnectionListener<HttpClientConnection> listener,
HttpVersion version,
SocketAddress server,
boolean ssl,
ContextInternal context,
Channel ch, long weight,
Promise<ConnectResult<HttpClientConnection>> future) {
boolean upgrade = version == HttpVersion.HTTP_2 && options.isHttp2ClearTextUpgrade();
VertxHandler<Http1xClientConnection> clientHandler = VertxHandler.create(context, chctx -> {
Http1xClientConnection conn = new Http1xClientConnection(listener, upgrade ? HttpVersion.HTTP_1_1 : version, client, endpointMetric, chctx, ssl, server, context, metrics);
if (metrics != null) {
context.executeFromIO(v -> {
Object socketMetric = metrics.connected(conn.remoteAddress(), conn.remoteName());
conn.metric(socketMetric);
metrics.endpointConnected(endpointMetric, socketMetric);
});
}
return conn;
});
clientHandler.addHandler(conn -> {
if (upgrade) {
future.complete(new ConnectResult<>(new Http2UpgradedClientConnection(client, conn), 1, http2Weight));
} else {
future.complete(new ConnectResult<>(conn, http1MaxConcurrency, http1Weight));
}
});
clientHandler.removeHandler(conn -> {
listener.onEvict();
});
ch.pipeline().addLast("handler", clientHandler);
}
private void http2Connected(ConnectionListener<HttpClientConnection> listener,
ContextInternal context,
Channel ch,
Promise<ConnectResult<HttpClientConnection>> future) {
try {
VertxHttp2ConnectionHandler<Http2ClientConnection> clientHandler = Http2ClientConnection.createHttp2ConnectionHandler(client, endpointMetric, listener, context, null, (conn, concurrency) -> {
future.complete(new ConnectResult<>(conn, concurrency, http2Weight));
});
ch.pipeline().addLast("handler", clientHandler);
ch.flush();
} catch (Exception e) {
connectFailed(ch, listener, e, future);
}
}
private void connectFailed(Channel ch, ConnectionListener<HttpClientConnection> listener, Throwable t, Promise<ConnectResult<HttpClientConnection>> future) {
if (ch != null) {
try {
ch.close();
} catch (Exception ignore) {
}
}
future.tryFail(t);
}
}