package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.*;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.ConnectionListener;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.cert.X509Certificate;
public class Http2UpgradedClientConnection implements HttpClientConnection {
private HttpClientImpl client;
private HttpClientConnection current;
private Handler<Void> closeHandler;
private Handler<Void> shutdownHandler;
private Handler<GoAway> goAwayHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Buffer> pingHandler;
private Handler<Http2Settings> remoteSettingsHandler;
Http2UpgradedClientConnection(HttpClientImpl client, Http1xClientConnection connection) {
this.client = client;
this.current = connection;
}
@Override
public ChannelHandlerContext channelHandlerContext() {
return current.channelHandlerContext();
}
@Override
public Channel channel() {
return current.channel();
}
@Override
public void close() {
current.close();
}
@Override
public Object metric() {
return current.metric();
}
private class UpgradingStream implements HttpClientStream {
private HttpClientRequestImpl request;
private Http1xClientConnection conn;
private HttpClientStream stream;
UpgradingStream(HttpClientStream stream, Http1xClientConnection conn) {
this.conn = conn;
this.stream = stream;
}
@Override
public HttpClientConnection connection() {
return Http2UpgradedClientConnection.this;
}
@Override
public void writeHead(HttpMethod method,
String rawMethod,
String uri,
MultiMap headers,
String hostHeader,
boolean chunked,
ByteBuf buf,
boolean end,
StreamPriority priority,
Handler<Void> continueHandler,
Handler<AsyncResult<Void>> handler) {
ChannelPipeline pipeline = conn.channel().pipeline();
HttpClientCodec httpCodec = pipeline.get(HttpClientCodec.class);
class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
ChannelPipeline pipeline = ctx.pipeline();
if (evt == HttpClientUpgradeHandler.UpgradeEvent.UPGRADE_SUCCESSFUL) {
pipeline.remove(conn.handler());
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpResponse) {
pipeline.remove(this);
HttpResponse resp = (HttpResponse) msg;
if (resp.status() != HttpResponseStatus.SWITCHING_PROTOCOLS) {
resp.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
}
}
super.channelRead(ctx, msg);
}
}
VertxHttp2ClientUpgradeCodec upgradeCodec = new VertxHttp2ClientUpgradeCodec(client.getOptions().getInitialSettings()) {
@Override
public void upgradeTo(ChannelHandlerContext ctx, FullHttpResponse upgradeResponse) throws Exception {
ConnectionListener<HttpClientConnection> listener = conn.listener();
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = Http2ClientConnection.createHttp2ConnectionHandler(client, conn.endpointMetric(), listener, conn.getContext(), current.metric(), (conn, concurrency) -> {
conn.upgradeStream(stream.metric(), ar -> {
UpgradingStream.this.conn.closeHandler(null);
UpgradingStream.this.conn.exceptionHandler(null);
if (ar.succeeded()) {
HttpClientStream upgradedStream = ar.result();
upgradedStream.beginRequest(request);
current = conn;
conn.closeHandler(closeHandler);
conn.exceptionHandler(exceptionHandler);
conn.pingHandler(pingHandler);
conn.goAwayHandler(goAwayHandler);
conn.shutdownHandler(shutdownHandler);
conn.remoteSettingsHandler(remoteSettingsHandler);
listener.onConcurrencyChange(concurrency);
} else {
ar.cause().printStackTrace();
}
});
});
conn.channel().pipeline().addLast(handler);
handler.clientUpgrade(ctx);
}
};
HttpClientUpgradeHandler upgradeHandler = new HttpClientUpgradeHandler(httpCodec, upgradeCodec, 65536);
pipeline.addAfter("codec", null, new UpgradeRequestHandler());
pipeline.addAfter("codec", null, upgradeHandler);
stream.writeHead(method, rawMethod, uri, headers, hostHeader, chunked, buf, end, priority, continueHandler, handler);
}
@Override
public int id() {
return 1;
}
@Override
public Object metric() {
return stream.metric();
}
@Override
public HttpVersion version() {
return HttpVersion.HTTP_2;
}
@Override
public Context getContext() {
return stream.getContext();
}
@Override
public void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler) {
stream.writeBuffer(buf, end, handler);
}
@Override
public void writeFrame(int type, int flags, ByteBuf payload) {
stream.writeFrame(type, flags, payload);
}
@Override
public void doSetWriteQueueMaxSize(int size) {
stream.doSetWriteQueueMaxSize(size);
}
@Override
public boolean isNotWritable() {
return stream.isNotWritable();
}
@Override
public void doPause() {
stream.doPause();
}
@Override
public void doFetch(long amount) {
stream.doFetch(amount);
}
@Override
public void reset(Throwable cause) {
stream.reset(cause);
}
@Override
public void beginRequest(HttpClientRequestImpl req) {
request = req;
stream.beginRequest(req);
}
@Override
public void endRequest() {
stream.endRequest();
}
@Override
public NetSocket createNetSocket() {
return stream.createNetSocket();
}
@Override
public StreamPriority priority() {
return stream.priority();
}
@Override
public void updatePriority(StreamPriority streamPriority) {
stream.updatePriority(streamPriority);
}
}
@Override
public void createStream(Handler<AsyncResult<HttpClientStream>> handler) {
if (current instanceof Http1xClientConnection) {
current.createStream(ar -> {
if (ar.succeeded()) {
HttpClientStream stream = ar.result();
UpgradingStream upgradingStream = new UpgradingStream(stream, (Http1xClientConnection) current);
handler.handle(Future.succeededFuture(upgradingStream));
} else {
handler.handle(ar);
}
});
} else {
current.createStream(handler);
}
}
@Override
public ContextInternal getContext() {
return current.getContext();
}
@Override
public HttpConnection closeHandler(Handler<Void> handler) {
closeHandler = handler;
current.closeHandler(handler);
return this;
}
@Override
public HttpConnection exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
current.exceptionHandler(handler);
return this;
}
@Override
public HttpConnection remoteSettingsHandler(Handler<Http2Settings> handler) {
if (current instanceof Http1xClientConnection) {
remoteSettingsHandler = handler;
} else {
current.remoteSettingsHandler(handler);
}
return this;
}
@Override
public HttpConnection pingHandler(@Nullable Handler<Buffer> handler) {
if (current instanceof Http1xClientConnection) {
pingHandler = handler;
} else {
current.pingHandler(handler);
}
return this;
}
@Override
public HttpConnection goAwayHandler(@Nullable Handler<GoAway> handler) {
if (current instanceof Http1xClientConnection) {
goAwayHandler = handler;
} else {
current.goAwayHandler(handler);
}
return this;
}
@Override
public HttpConnection shutdownHandler(@Nullable Handler<Void> handler) {
if (current instanceof Http1xClientConnection) {
shutdownHandler = handler;
} else {
current.shutdownHandler(handler);
}
return this;
}
@Override
public HttpConnection goAway(long errorCode, int lastStreamId, Buffer debugData) {
return current.goAway(errorCode, lastStreamId, debugData);
}
@Override
public HttpConnection shutdown() {
return current.shutdown();
}
@Override
public HttpConnection shutdown(long timeoutMs) {
return current.shutdown(timeoutMs);
}
@Override
public HttpConnection updateSettings(Http2Settings settings) {
return current.updateSettings(settings);
}
@Override
public HttpConnection updateSettings(Http2Settings settings, Handler<AsyncResult<Void>> completionHandler) {
return current.updateSettings(settings, completionHandler);
}
@Override
public Http2Settings settings() {
return current.settings();
}
@Override
public Http2Settings remoteSettings() {
return current.remoteSettings();
}
@Override
public HttpConnection ping(Buffer data, Handler<AsyncResult<Buffer>> pongHandler) {
return current.ping(data, pongHandler);
}
@Override
public SocketAddress remoteAddress() {
return current.remoteAddress();
}
@Override
public SocketAddress localAddress() {
return current.localAddress();
}
@Override
public boolean isSsl() {
return current.isSsl();
}
@Override
public SSLSession sslSession() {
return current.sslSession();
}
@Override
public X509Certificate[] peerCertificateChain() throws SSLPeerUnverifiedException {
return current.peerCertificateChain();
}
@Override
public String indicatedServerName() {
return current.indicatedServerName();
}
}