package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Stream;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
import io.vertx.core.impl.EventLoopContext;
import io.vertx.core.net.impl.clientconnection.ConnectionListener;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.*;
import java.util.function.BiConsumer;
import static io.vertx.core.http.HttpHeaders.DEFLATE_GZIP;
class Http2ClientConnection extends Http2ConnectionBase implements HttpClientConnection {
private final ConnectionListener<HttpClientConnection> listener;
private final HttpClientImpl client;
private final ClientMetrics metrics;
private long expirationTimestamp;
private boolean evicted;
Http2ClientConnection(ConnectionListener<HttpClientConnection> listener,
HttpClientImpl client,
EventLoopContext context,
VertxHttp2ConnectionHandler connHandler,
ClientMetrics metrics) {
super(context, connHandler);
this.metrics = metrics;
this.client = client;
this.listener = listener;
}
@Override
synchronized boolean onGoAwaySent(int lastStreamId, long errorCode, ByteBuf debugData) {
boolean goneAway = super.onGoAwaySent(lastStreamId, errorCode, debugData);
if (goneAway) {
tryEvict();
}
return goneAway;
}
@Override
synchronized boolean onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
boolean goneAway = super.onGoAwayReceived(lastStreamId, errorCode, debugData);
if (goneAway) {
tryEvict();
}
return goneAway;
}
private void tryEvict() {
if (!evicted) {
evicted = true;
listener.onEvict();
}
}
@Override
protected void concurrencyChanged(long concurrency) {
int limit = client.getOptions().getHttp2MultiplexingLimit();
if (limit > 0) {
concurrency = Math.min(concurrency, limit);
}
listener.onConcurrencyChange(concurrency);
}
@Override
public HttpClientMetrics metrics() {
return client.metrics();
}
void upgradeStream(Object metric, ContextInternal context, Handler<AsyncResult<HttpClientStream>> completionHandler) {
Future<HttpClientStream> fut;
synchronized (this) {
try {
StreamImpl stream = createStream(context);
stream.init(handler.connection().stream(1));
((Stream)stream).metric = metric;
fut = Future.succeededFuture(stream);
} catch (Exception e) {
fut = Future.failedFuture(e);
}
}
completionHandler.handle(fut);
}
@Override
public void createStream(ContextInternal context, Handler<AsyncResult<HttpClientStream>> handler) {
Future<HttpClientStream> fut;
synchronized (this) {
try {
StreamImpl stream = createStream(context);
fut = Future.succeededFuture(stream);
} catch (Exception e) {
fut = Future.failedFuture(e);
}
}
context.emit(fut, handler);
}
private StreamImpl createStream(ContextInternal context) {
return new StreamImpl(this, context, false);
}
private void recycle() {
int timeout = client.getOptions().getHttp2KeepAliveTimeout();
long expired = timeout > 0 ? System.currentTimeMillis() + timeout * 1000 : 0L;
expirationTimestamp = timeout > 0 ? System.currentTimeMillis() + timeout * 1000 : 0L;
listener.onRecycle();
}
@Override
public boolean isValid() {
return expirationTimestamp == 0 || System.currentTimeMillis() <= expirationTimestamp;
}
protected synchronized void onHeadersRead(int streamId, Http2Headers headers, StreamPriority streamPriority, boolean endOfStream) {
Stream stream = (Stream) stream(streamId);
if (!stream.stream.isTrailersReceived()) {
stream.onHeaders(headers, streamPriority);
if (endOfStream) {
stream.onEnd();
}
} else {
stream.onEnd(new Http2HeadersAdaptor(headers));
}
}
private void metricsEnd(Stream stream) {
if (metrics != null) {
metrics.responseEnd(stream.metric, stream.bytesRead());
}
}
@Override
public synchronized void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId, Http2Headers headers, int padding) throws Http2Exception {
StreamImpl stream = (StreamImpl) stream(streamId);
if (stream != null) {
Handler<HttpClientPush> pushHandler = stream.pushHandler;
if (pushHandler != null) {
Http2Stream promisedStream = handler.connection().stream(promisedStreamId);
StreamImpl pushStream = new StreamImpl(this, context, true);
pushStream.init(promisedStream);
HttpClientPush push = new HttpClientPush(headers, pushStream);
if (metrics != null) {
Object metric = metrics.requestBegin(headers.path().toString(), push);
pushStream.metric = metric;
metrics.requestEnd(metric, 0L);
}
stream.context.dispatch(push, pushHandler);
return;
}
}
Http2ClientConnection.this.handler.writeReset(promisedStreamId, Http2Error.CANCEL.code());
}
static abstract class Stream extends VertxHttp2Stream<Http2ClientConnection> {
private final boolean push;
private HttpResponseHead response;
protected Object metric;
protected Object trace;
private boolean requestEnded;
private boolean responseEnded;
protected Handler<HttpResponseHead> headHandler;
protected Handler<Buffer> chunkHandler;
protected Handler<MultiMap> endHandler;
protected Handler<StreamPriority> priorityHandler;
protected Handler<Void> drainHandler;
protected Handler<Void> continueHandler;
protected Handler<HttpFrame> unknownFrameHandler;
protected Handler<Throwable> exceptionHandler;
protected Handler<HttpClientPush> pushHandler;
Stream(Http2ClientConnection conn, ContextInternal context, boolean push) {
super(conn, context);
this.push = push;
}
void onContinue() {
context.emit(null, v -> handleContinue());
}
abstract void handleContinue();
public Object metric() {
return metric;
}
@Override
void doWriteData(ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
super.doWriteData(chunk, end, handler);
if (end) {
endRequest();
}
}
@Override
void doWriteHeaders(Http2Headers headers, boolean end, Handler<AsyncResult<Void>> handler) {
isConnect = "CONNECT".contentEquals(headers.method());
super.doWriteHeaders(headers, end, handler);
if (end) {
endRequest();
}
}
protected void endRequest() {
requestEnded = true;
if (conn.metrics != null) {
conn.metrics.requestEnd(metric, bytesWritten());
}
}
@Override
void onEnd(MultiMap trailers) {
conn.metricsEnd(this);
responseEnded = true;
super.onEnd(trailers);
}
@Override
void onReset(long code) {
if (conn.metrics != null) {
conn.metrics.requestReset(metric);
}
super.onReset(code);
}
@Override
void onHeaders(Http2Headers headers, StreamPriority streamPriority) {
if (streamPriority != null) {
priority(streamPriority);
}
if (response == null) {
int status;
String statusMessage;
try {
status = Integer.parseInt(headers.status().toString());
statusMessage = HttpResponseStatus.valueOf(status).reasonPhrase();
} catch (Exception e) {
handleException(e);
writeReset(0x01 );
return;
}
if (status == 100) {
onContinue();
return;
}
response = new HttpResponseHead(
HttpVersion.HTTP_2,
status,
statusMessage,
new Http2HeadersAdaptor(headers));
headers.remove(":status");
if (conn.metrics != null) {
conn.metrics.responseBegin(metric, response);
}
if (headHandler != null) {
context.emit(response, headHandler);
}
}
}
@Override
void onClose() {
if (conn.metrics != null) {
if (!requestEnded || !responseEnded) {
conn.metrics.requestReset(metric);
}
}
VertxTracer tracer = context.tracer();
if (tracer != null && trace != null) {
VertxException err;
if (responseEnded && requestEnded) {
err = null;
} else {
err = ConnectionBase.CLOSED_EXCEPTION;
}
tracer.receiveResponse(context, response, trace, err, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
}
if (!responseEnded) {
onError(CLOSED_EXCEPTION);
}
super.onClose();
if (!push) {
conn.recycle();
}
}
}
static class StreamImpl extends Stream implements HttpClientStream {
StreamImpl(Http2ClientConnection conn, ContextInternal context, boolean push) {
super(conn, context, push);
}
@Override
public void continueHandler(Handler<Void> handler) {
continueHandler = handler;
}
@Override
public void unknownFrameHandler(Handler<HttpFrame> handler) {
unknownFrameHandler = handler;
}
@Override
public void pushHandler(Handler<HttpClientPush> handler) {
pushHandler = handler;
}
@Override
public void drainHandler(Handler<Void> handler) {
drainHandler = handler;
}
@Override
public void exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
}
@Override
public void headHandler(Handler<HttpResponseHead> handler) {
headHandler = handler;
}
@Override
public void chunkHandler(Handler<Buffer> handler) {
chunkHandler = handler;
}
@Override
public void priorityHandler(Handler<StreamPriority> handler) {
priorityHandler = handler;
}
@Override
public void endHandler(Handler<MultiMap> handler) {
endHandler = handler;
}
@Override
public StreamPriority priority() {
return super.priority();
}
@Override
public void updatePriority(StreamPriority streamPriority) {
super.updatePriority(streamPriority);
}
@Override
public HttpVersion version() {
return HttpVersion.HTTP_2;
}
@Override
void handleEnd(MultiMap trailers) {
if (endHandler != null) {
endHandler.handle(trailers);
}
}
@Override
void handleData(Buffer buf) {
if (chunkHandler != null) {
chunkHandler.handle(buf);
}
}
@Override
void handleReset(long errorCode) {
handleException(new StreamResetException(errorCode));
}
@Override
void handleWritabilityChanged(boolean writable) {
if (writable && drainHandler != null) {
drainHandler.handle(null);
}
}
@Override
void handleCustomFrame(HttpFrame frame) {
if (unknownFrameHandler != null) {
unknownFrameHandler.handle(frame);
}
}
@Override
void handlePriorityChange(StreamPriority streamPriority) {
if (priorityHandler != null) {
priorityHandler.handle(streamPriority);
}
}
void handleContinue() {
if (continueHandler != null) {
continueHandler.handle(null);
}
}
void handleException(Throwable exception) {
if (exceptionHandler != null) {
exceptionHandler.handle(exception);
}
}
@Override
public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler<AsyncResult<Void>> handler) {
priority(priority);
conn.context.emit(null, v -> {
writeHeaders(request, buf, end, priority, connect, handler);
});
}
private void writeHeaders(HttpRequestHead request, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler<AsyncResult<Void>> handler) {
Http2Headers headers = new DefaultHttp2Headers();
headers.method(request.method.name());
boolean e;
if (request.method == HttpMethod.CONNECT) {
if (request.authority == null) {
throw new IllegalArgumentException("Missing :authority / host header");
}
headers.authority(request.authority);
e = false;
} else {
headers.path(request.uri);
headers.scheme(conn.isSsl() ? "https" : "http");
if (request.authority != null) {
headers.authority(request.authority);
}
e= end;
}
if (request.headers != null && request.headers.size() > 0) {
for (Map.Entry<String, String> header : request.headers) {
headers.add(HttpUtils.toLowerCase(header.getKey()), header.getValue());
}
}
if (conn.client.getOptions().isTryUseCompression() && headers.get(HttpHeaderNames.ACCEPT_ENCODING) == null) {
headers.set(HttpHeaderNames.ACCEPT_ENCODING, DEFLATE_GZIP);
}
try {
createStream(request, headers, handler);
} catch (Http2Exception ex) {
if (handler != null) {
handler.handle(context.failedFuture(ex));
}
handleException(ex);
return;
}
if (buf != null) {
doWriteHeaders(headers, false, null);
doWriteData(buf, e, handler);
} else {
doWriteHeaders(headers, e, handler);
}
}
private void createStream(HttpRequestHead head, Http2Headers headers, Handler<AsyncResult<Void>> handler) throws Http2Exception {
int id = this.conn.handler.encoder().connection().local().lastStreamCreated();
if (id == 0) {
id = 1;
} else {
id += 2;
}
head.id = id;
head.remoteAddress = conn.remoteAddress();
Http2Stream stream = this.conn.handler.encoder().connection().local().createStream(id, false);
init(stream);
if (conn.metrics != null) {
metric = conn.metrics.requestBegin(headers.path().toString(), head);
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
BiConsumer<String, String> headers_ = headers::add;
trace = tracer.sendRequest(context, SpanKind.RPC, conn.client.getOptions().getTracingPolicy(), head, headers.method().toString(), headers_, HttpUtils.CLIENT_HTTP_REQUEST_TAG_EXTRACTOR);
}
}
@Override
public void writeBuffer(ByteBuf buf, boolean end, Handler<AsyncResult<Void>> listener) {
writeData(buf, end, listener);
}
@Override
public ContextInternal getContext() {
return context;
}
@Override
public void doSetWriteQueueMaxSize(int size) {
}
@Override
public void reset(Throwable cause) {
long code = cause instanceof StreamResetException ? ((StreamResetException)cause).getCode() : 0;
conn.context.emit(code, this::writeReset);
}
@Override
public HttpClientConnection connection() {
return conn;
}
}
@Override
protected void handleIdle() {
if (handler.connection().local().numActiveStreams() > 0) {
super.handleIdle();
}
}
public static VertxHttp2ConnectionHandler<Http2ClientConnection> createHttp2ConnectionHandler(
HttpClientImpl client,
ClientMetrics metrics,
ConnectionListener<HttpClientConnection> listener,
EventLoopContext context,
Object socketMetric,
BiConsumer<Http2ClientConnection, Long> c) {
long http2MaxConcurrency = client.getOptions().getHttp2MultiplexingLimit() <= 0 ? Long.MAX_VALUE : client.getOptions().getHttp2MultiplexingLimit();
HttpClientOptions options = client.getOptions();
VertxHttp2ConnectionHandler<Http2ClientConnection> handler = new VertxHttp2ConnectionHandlerBuilder<Http2ClientConnection>()
.server(false)
.useCompression(client.getOptions().isTryUseCompression())
.gracefulShutdownTimeoutMillis(0)
.initialSettings(client.getOptions().getInitialSettings())
.connectionFactory(connHandler -> new Http2ClientConnection(listener, client, context, connHandler, metrics))
.logEnabled(options.getLogActivity())
.build();
HttpClientMetrics met = client.metrics();
handler.addHandler(conn -> {
if (options.getHttp2ConnectionWindowSize() > 0) {
conn.setWindowSize(options.getHttp2ConnectionWindowSize());
}
if (metrics != null) {
Object m = socketMetric;
if (m == null) {
m = met.connected(conn.remoteAddress(), conn.remoteName());
met.endpointConnected(metrics);
}
conn.metric(m);
}
long concurrency = conn.remoteSettings().getMaxConcurrentStreams();
if (http2MaxConcurrency > 0) {
concurrency = Math.min(concurrency, http2MaxConcurrency);
}
c.accept(conn, concurrency);
});
handler.removeHandler(conn -> {
if (metrics != null) {
met.endpointDisconnected(metrics);
}
conn.tryEvict();
});
return handler;
}
}