package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker;
import io.netty.util.concurrent.FutureListener;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.headers.HeadersAdaptor;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.net.impl.NetSocketImpl;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.core.net.impl.clientconnection.ConnectionListener;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.streams.impl.InboundBuffer;
import java.net.URI;
import java.util.*;
import java.util.function.BiConsumer;
import static io.vertx.core.http.HttpHeaders.*;
public class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> implements HttpClientConnection {
private static final Logger log = LoggerFactory.getLogger(Http1xClientConnection.class);
private static final Handler<Object> INVALID_MSG_HANDLER = msg -> {
throw new IllegalStateException("Invalid object " + msg);
};
private final ConnectionListener<HttpClientConnection> listener;
private final HttpClientImpl client;
private final HttpClientOptions options;
private final boolean ssl;
private final SocketAddress server;
public final ClientMetrics metrics;
private final HttpVersion version;
private Deque<Stream> requests = new ArrayDeque<>();
private Deque<Stream> responses = new ArrayDeque<>();
private boolean closed;
private boolean shutdown;
private long shutdownTimerID = -1L;
private Handler<Object> invalidMessageHandler = INVALID_MSG_HANDLER;
private boolean close;
private boolean isConnect;
private int keepAliveTimeout;
private long expirationTimestamp;
private int seq = 1;
Http1xClientConnection(ConnectionListener<HttpClientConnection> listener,
HttpVersion version,
HttpClientImpl client,
ChannelHandlerContext channel,
boolean ssl,
SocketAddress server,
ContextInternal context,
ClientMetrics metrics) {
super(context, channel);
this.listener = listener;
this.client = client;
this.options = client.getOptions();
this.ssl = ssl;
this.server = server;
this.metrics = metrics;
this.version = version;
this.keepAliveTimeout = options.getKeepAliveTimeout();
}
ConnectionListener<HttpClientConnection> listener() {
return listener;
}
public NetSocketInternal toNetSocket() {
removeChannelHandlers();
NetSocketImpl socket = new NetSocketImpl(context, chctx, client.getSslHelper(), metrics());
socket.metric(metric());
listener.onEvict();
chctx.pipeline().replace("handler", "handler", VertxHandler.create(ctx -> socket));
return socket;
}
private HttpRequest createRequest(
HttpMethod method,
String uri,
MultiMap headerMap,
String authority,
boolean chunked,
ByteBuf buf,
boolean end) {
HttpRequest request = new DefaultHttpRequest(HttpUtils.toNettyHttpVersion(version), method.toNetty(), uri, false);
HttpHeaders headers = request.headers();
if (headerMap != null) {
for (Map.Entry<String, String> header : headerMap) {
headers.add(header.getKey(), header.getValue());
}
}
if (!headers.contains(HOST)) {
request.headers().set(HOST, authority);
} else {
headers.remove(TRANSFER_ENCODING);
}
if (chunked) {
HttpUtil.setTransferEncodingChunked(request, true);
}
if (options.isTryUseCompression() && request.headers().get(ACCEPT_ENCODING) == null) {
request.headers().set(ACCEPT_ENCODING, DEFLATE_GZIP);
}
if (!options.isKeepAlive() && options.getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_1) {
request.headers().set(CONNECTION, CLOSE);
} else if (options.isKeepAlive() && options.getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_0) {
request.headers().set(CONNECTION, KEEP_ALIVE);
}
if (end) {
if (buf != null) {
request = new AssembledFullHttpRequest(request, buf);
} else {
request = new AssembledFullHttpRequest(request);
}
} else if (buf != null) {
request = new AssembledHttpRequest(request, buf);
}
return request;
}
private void beginRequest(Stream stream, HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, Handler<AsyncResult<Void>> handler) {
request.id = stream.id;
request.remoteAddress = remoteAddress();
stream.bytesWritten += buf != null ? buf.readableBytes() : 0L;
HttpRequest nettyRequest = createRequest(request.method, request.uri, request.headers, request.authority, chunked, buf, end);
synchronized (this) {
responses.add(stream);
this.isConnect = connect;
if (this.metrics != null) {
stream.metric = this.metrics.requestBegin(request.uri, request);
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
List<Map.Entry<String, String>> tags = new ArrayList<>();
tags.add(new AbstractMap.SimpleEntry<>("http.url", "todo"));
tags.add(new AbstractMap.SimpleEntry<>("http.method", request.method.name()));
BiConsumer<String, String> headers = (key, val) -> nettyRequest.headers().add(key, val);
stream.trace = tracer.sendRequest(stream.context, SpanKind.RPC, options.getTracingPolicy(), request, request.method.name(), headers, HttpUtils.CLIENT_HTTP_REQUEST_TAG_EXTRACTOR);
}
}
writeToChannel(nettyRequest, handler == null ? null : context.promise(handler));
if (end) {
endRequest(stream);
}
}
private void writeBuffer(Stream s, ByteBuf buff, boolean end, FutureListener<Void> listener) {
s.bytesWritten += buff != null ? buff.readableBytes() : 0L;
Object msg;
if (isConnect) {
msg = buff != null ? buff : Unpooled.EMPTY_BUFFER;
if (end) {
writeToChannel(msg, channelFuture()
.addListener(listener)
.addListener(v -> close())
);
} else {
writeToChannel(msg);
}
} else {
if (end) {
if (buff != null && buff.isReadable()) {
msg = new DefaultLastHttpContent(buff, false);
} else {
msg = LastHttpContent.EMPTY_LAST_CONTENT;
}
} else {
msg = new DefaultHttpContent(buff);
}
writeToChannel(msg, listener);
if (end) {
endRequest(s);
}
}
}
private void endRequest(Stream s) {
Stream next;
boolean recycle;
synchronized (this) {
requests.pop();
next = requests.peek();
recycle = s.responseEnded;
if (metrics != null) {
metrics.requestEnd(s.metric, s.bytesWritten);
}
}
flushBytesWritten();
if (next != null) {
next.promise.complete((HttpClientStream) next);
}
if (recycle) {
recycle();
}
}
private void resetRequest(Stream stream) {
boolean close;
synchronized (this) {
if (responses.remove(stream)) {
close = true;
} else if (requests.remove(stream)) {
close = false;
} else {
return;
}
}
if (close) {
close();
} else {
recycle();
}
}
private abstract static class Stream {
protected final Promise<HttpClientStream> promise;
protected final ContextInternal context;
protected final int id;
private Object trace;
private Object metric;
private HttpResponseHead response;
private boolean responseEnded;
private long bytesRead;
private long bytesWritten;
Stream(ContextInternal context, int id) {
this.context = context;
this.id = id;
this.promise = context.promise();
}
Object metric() {
return metric;
}
abstract void handleContinue();
abstract void handleHead(HttpResponseHead response);
abstract void handleChunk(Buffer buff);
abstract void handleEnd(LastHttpContent trailer);
abstract void handleWritabilityChanged(boolean writable);
abstract void handleException(Throwable cause);
abstract void handleClosed();
}
private void drainResponse(Stream n) {
if (!n.responseEnded) {
this.doResume();
}
}
private static class StreamImpl extends Stream implements HttpClientStream {
private final Http1xClientConnection conn;
private final InboundBuffer<Object> queue;
private boolean reset;
private boolean writable;
private HttpRequestHead request;
private Handler<HttpResponseHead> headHandler;
private Handler<Buffer> chunkHandler;
private Handler<MultiMap> endHandler;
private Handler<Void> drainHandler;
private Handler<Void> continueHandler;
private Handler<Throwable> exceptionHandler;
StreamImpl(ContextInternal context, Http1xClientConnection conn, int id) {
super(context, id);
this.writable = !conn.isNotWritable();
this.conn = conn;
this.queue = new InboundBuffer<>(context, 5)
.drainHandler(v -> {
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
drained();
} else {
eventLoop.execute(this::drained);
}
})
.handler(item -> {
if (item instanceof MultiMap) {
Handler<MultiMap> handler = endHandler;
if (handler != null) {
handler.handle((MultiMap) item);
}
} else {
Handler<Buffer> handler = chunkHandler;
if (handler != null) {
handler.handle((Buffer) item);
}
}
})
.exceptionHandler(context::reportException);
}
private void drained() {
conn.drainResponse(this);
}
@Override
public void continueHandler(Handler<Void> handler) {
continueHandler = handler;
}
@Override
public void drainHandler(Handler<Void> handler) {
drainHandler = handler;
}
@Override
public void exceptionHandler(Handler<Throwable> handler) {
exceptionHandler = handler;
}
@Override
public void headHandler(Handler<HttpResponseHead> handler) {
this.headHandler = handler;
}
@Override
public void priorityHandler(Handler<StreamPriority> handler) {
}
@Override
public void pushHandler(Handler<HttpClientPush> handler) {
}
@Override
public void unknownFrameHandler(Handler<HttpFrame> handler) {
}
@Override
public int id() {
return id;
}
@Override
public Object metric() {
return super.metric();
}
@Override
public HttpVersion version() {
return conn.version;
}
@Override
public HttpClientConnection connection() {
return conn;
}
@Override
public ContextInternal getContext() {
return context;
}
@Override
public void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, boolean connect, Handler<AsyncResult<Void>> handler) {
writeHead(request, chunked, buf, end, connect, handler == null ? null : context.promise(handler));
}
private void writeHead(HttpRequestHead request, boolean chunked, ByteBuf buf, boolean end, boolean connect, Handler<AsyncResult<Void>> handler) {
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
this.request = request;
conn.beginRequest(this, request, chunked, buf, end, connect, handler);
} else {
eventLoop.execute(() -> writeHead(request, chunked, buf, end, connect, handler));
}
}
@Override
public void writeBuffer(ByteBuf buff, boolean end, Handler<AsyncResult<Void>> handler) {
if (buff != null || end) {
FutureListener<Void> listener = handler == null ? null : context.promise(handler);
writeBuffer(buff, end, listener);
}
}
private void writeBuffer(ByteBuf buff, boolean end, FutureListener<Void> listener) {
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
conn.writeBuffer(this, buff, end, listener);
} else {
eventLoop.execute(() -> writeBuffer(buff, end, listener));
}
}
@Override
public void writeFrame(int type, int flags, ByteBuf payload) {
throw new IllegalStateException("Cannot write an HTTP/2 frame over an HTTP/1.x connection");
}
@Override
public void doSetWriteQueueMaxSize(int size) {
conn.doSetWriteQueueMaxSize(size);
}
@Override
public boolean isNotWritable() {
synchronized (conn) {
return !writable;
}
}
@Override
public void doPause() {
queue.pause();
}
@Override
public void doFetch(long amount) {
queue.fetch(amount);
}
@Override
public void reset(Throwable cause) {
synchronized (conn) {
if (reset) {
return;
}
reset = true;
}
handleException(cause);
EventLoop eventLoop = conn.context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
reset();
} else {
eventLoop.execute(this::reset);
}
}
private void reset() {
conn.resetRequest(this);
}
@Override
public StreamPriority priority() {
return null;
}
@Override
public void updatePriority(StreamPriority streamPriority) {
}
@Override
void handleWritabilityChanged(boolean writable) {
Handler<Void> handler;
boolean drain;
synchronized (conn) {
drain = !this.writable && writable;
this.writable = writable;
handler = drainHandler;
}
if (drain && handler != null) {
handler.handle(null);
}
}
void handleContinue() {
if (continueHandler != null) {
continueHandler.handle(null);
}
}
@Override
void handleHead(HttpResponseHead response) {
Handler<HttpResponseHead> handler = headHandler;
if (handler != null) {
context.emit(response, handler);
}
}
@Override
public void chunkHandler(Handler<Buffer> handler) {
chunkHandler = handler;
}
@Override
public void endHandler(Handler<MultiMap> handler) {
endHandler = handler;
}
void handleChunk(Buffer buff) {
if (!queue.write(buff)) {
conn.doPause();
}
}
void handleEnd(LastHttpContent trailer) {
queue.write(new HeadersAdaptor(trailer.trailingHeaders()));
}
void handleException(Throwable cause) {
if (exceptionHandler != null) {
exceptionHandler.handle(cause);
}
}
@Override
void handleClosed() {
handleException(CLOSED_EXCEPTION);
}
}
private void checkLifecycle() {
if (close) {
close();
} else {
recycle();
}
}
private Throwable validateMessage(Object msg) {
if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg;
DecoderResult result = obj.decoderResult();
if (result.isFailure()) {
return result.cause();
} else if (obj instanceof io.netty.handler.codec.http.HttpResponse) {
io.netty.handler.codec.http.HttpVersion version = ((io.netty.handler.codec.http.HttpResponse) obj).protocolVersion();
if (version != io.netty.handler.codec.http.HttpVersion.HTTP_1_0 && version != io.netty.handler.codec.http.HttpVersion.HTTP_1_1) {
return new IllegalStateException("Unsupported HTTP version: " + version);
}
}
}
return null;
}
public void handleMessage(Object msg) {
Throwable error = validateMessage(msg);
if (error != null) {
fail(error);
} else if (msg instanceof HttpObject) {
handleHttpMessage((HttpObject) msg);
} else if (msg instanceof ByteBuf && isConnect) {
handleChunk((ByteBuf) msg);
} else if (msg instanceof WebSocketFrame) {
handleWsFrame((WebSocketFrame) msg);
} else {
invalidMessageHandler.handle(msg);
}
}
private void handleHttpMessage(HttpObject obj) {
Stream stream;
synchronized (this) {
stream = responses.peekFirst();
if (stream == null) {
return;
}
}
if (obj instanceof io.netty.handler.codec.http.HttpResponse) {
io.netty.handler.codec.http.HttpResponse response = (io.netty.handler.codec.http.HttpResponse) obj;
HttpVersion version;
if (response.protocolVersion() == io.netty.handler.codec.http.HttpVersion.HTTP_1_0) {
version = io.vertx.core.http.HttpVersion.HTTP_1_0;
} else {
version = io.vertx.core.http.HttpVersion.HTTP_1_1;
}
handleResponseBegin(stream, new HttpResponseHead(
version,
response.status().code(),
response.status().reasonPhrase(),
new HeadersAdaptor(response.headers())));
} else if (obj instanceof HttpContent) {
HttpContent chunk = (HttpContent) obj;
if (chunk.content().isReadable()) {
handleResponseChunk(stream, chunk.content());
}
if (!isConnect && chunk instanceof LastHttpContent) {
handleResponseEnd(stream, (LastHttpContent) chunk);
}
}
}
private void handleChunk(ByteBuf chunk) {
Stream stream;
synchronized (this) {
stream = responses.peekFirst();
if (stream == null) {
return;
}
}
if (chunk.isReadable()) {
handleResponseChunk(stream, chunk);
}
}
private void handleResponseBegin(Stream stream, HttpResponseHead response) {
if (response.statusCode == 100) {
stream.context.execute(null, v -> stream.handleContinue());
} else {
HttpRequestHead request;
synchronized (this) {
request = ((StreamImpl)stream).request;
stream.response = response;
if (metrics != null) {
metrics.responseBegin(stream.metric, response);
}
if (response.statusCode != 100 && request.method != HttpMethod.CONNECT) {
String responseConnectionHeader = response.headers.get(HttpHeaderNames.CONNECTION);
String requestConnectionHeader = request.headers != null ? request.headers.get(HttpHeaderNames.CONNECTION) : null;
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
this.close = true;
} else if (response.version == HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
this.close = true;
}
String keepAliveHeader = response.headers.get(HttpHeaderNames.KEEP_ALIVE);
if (keepAliveHeader != null) {
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
if (timeout != -1) {
this.keepAliveTimeout = timeout;
}
}
}
}
stream.handleHead(response);
if (isConnect) {
if ((request.method == HttpMethod.CONNECT &&
response.statusCode == 200) || (
request.method == HttpMethod.GET &&
request.headers.contains("connection", "Upgrade", false) &&
response.statusCode == 101)) {
removeChannelHandlers();
}
}
}
}
private List<Object> removeChannelHandlers() {
ChannelPipeline pipeline = chctx.pipeline();
ChannelHandler inflater = pipeline.get(HttpContentDecompressor.class);
if (inflater != null) {
pipeline.remove(inflater);
}
List<Object> pending = new ArrayList<>();
Handler<Object> prev = invalidMessageHandler;
invalidMessageHandler = pending::add;
try {
pipeline.remove("codec");
} finally {
invalidMessageHandler = prev;
}
return pending;
}
private void handleResponseChunk(Stream stream, ByteBuf chunk) {
Buffer buff = Buffer.buffer(VertxHandler.safeBuffer(chunk, chctx.alloc()));
stream.bytesRead += buff.length();
stream.context.execute(buff, stream::handleChunk);
}
private void handleResponseEnd(Stream stream, LastHttpContent trailer) {
boolean check;
synchronized (this) {
if (stream.response == null) {
return;
}
responses.pop();
close |= !options.isKeepAlive();
stream.responseEnded = true;
check = requests.peek() != stream;
}
VertxTracer tracer = context.tracer();
if (tracer != null) {
tracer.receiveResponse(stream.context, stream.response, stream.trace, null, HttpUtils.CLIENT_RESPONSE_TAG_EXTRACTOR);
}
if (metrics != null) {
metrics.responseEnd(stream.metric, stream.bytesRead);
}
stream.context.execute(trailer, stream::handleEnd);
this.doResume();
flushBytesRead();
if (check) {
checkLifecycle();
}
}
public HttpClientMetrics metrics() {
return client.metrics();
}
synchronized void toWebSocket(
ContextInternal context,
String requestURI,
MultiMap headers,
WebsocketVersion vers,
List<String> subProtocols,
int maxWebSocketFrameSize,
Promise<WebSocket> promise) {
try {
URI wsuri = new URI(requestURI);
if (!wsuri.isAbsolute()) {
wsuri = new URI((ssl ? "https:" : "http:") + "//" + server.host() + ":" + server.port() + requestURI);
}
WebSocketVersion version =
WebSocketVersion.valueOf((vers == null ?
WebSocketVersion.V13 : vers).toString());
HttpHeaders nettyHeaders;
if (headers != null) {
nettyHeaders = new DefaultHttpHeaders();
for (Map.Entry<String, String> entry: headers) {
nettyHeaders.add(entry.getKey(), entry.getValue());
}
} else {
nettyHeaders = null;
}
ChannelPipeline p = chctx.channel().pipeline();
ArrayList<WebSocketClientExtensionHandshaker> extensionHandshakers = initializeWebSocketExtensionHandshakers(client.getOptions());
if (!extensionHandshakers.isEmpty()) {
p.addBefore("handler", "webSocketsExtensionsHandler", new WebSocketClientExtensionHandler(
extensionHandshakers.toArray(new WebSocketClientExtensionHandshaker[0])));
}
String subp = null;
if (subProtocols != null) {
subp = String.join(",", subProtocols);
}
WebSocketClientHandshaker handshaker = WebSocketHandshakeInboundHandler.newHandshaker(
wsuri,
version,
subp,
!extensionHandshakers.isEmpty(),
nettyHeaders,
maxWebSocketFrameSize,
!options.isSendUnmaskedFrames());
WebSocketHandshakeInboundHandler handshakeInboundHandler = new WebSocketHandshakeInboundHandler(handshaker, ar -> {
AsyncResult<WebSocket> wsRes = ar.map(v -> {
WebSocketImpl w = new WebSocketImpl(
context,
Http1xClientConnection.this,
version != WebSocketVersion.V00,
options.getWebSocketClosingTimeout(),
options.getMaxWebSocketFrameSize(),
options.getMaxWebSocketMessageSize());
w.subProtocol(handshaker.actualSubprotocol());
return w;
});
if (ar.failed()) {
close();
} else {
webSocket = (WebSocketImpl) wsRes.result();
webSocket.registerHandler(vertx.eventBus());
}
log.debug("WebSocket handshake complete");
HttpClientMetrics metrics = client.metrics();
if (metrics != null) {
webSocket.setMetric(metrics.connected(webSocket));
}
getContext().emit(wsRes, res -> {
if (res.succeeded()) {
webSocket.headers(ar.result());
}
promise.handle(res);
if (res.succeeded()) {
webSocket.headers(null);
}
});
});
p.addBefore("handler", "handshakeCompleter", handshakeInboundHandler);
handshaker.handshake(chctx.channel());
} catch (Exception e) {
handleException(e);
}
}
ArrayList<WebSocketClientExtensionHandshaker> initializeWebSocketExtensionHandshakers(HttpClientOptions options) {
ArrayList<WebSocketClientExtensionHandshaker> extensionHandshakers = new ArrayList<>();
if (options.getTryWebSocketDeflateFrameCompression()) {
extensionHandshakers.add(new DeflateFrameClientExtensionHandshaker(options.getWebSocketCompressionLevel(),
false));
}
if (options.getTryUsePerMessageWebSocketCompression()) {
extensionHandshakers.add(new PerMessageDeflateClientExtensionHandshaker(options.getWebSocketCompressionLevel(),
ZlibCodecFactory.isSupportingWindowSizeAndMemLevel(), PerMessageDeflateServerExtensionHandshaker.MAX_WINDOW_SIZE,
options.getWebSocketCompressionAllowClientNoContext(), options.getWebSocketCompressionRequestServerNoContext()));
}
return extensionHandshakers;
}
@Override
public void handleInterestedOpsChanged() {
boolean writable = !isNotWritable();
ContextInternal context;
Handler<Boolean> handler;
synchronized (this) {
Stream current = requests.peek();
if (current != null) {
context = current.context;
handler = current::handleWritabilityChanged;
} else if (webSocket != null) {
context = webSocket.context;
handler = webSocket::handleWritabilityChanged;
} else {
return;
}
}
context.execute(writable, handler);
}
private Iterable<Stream> pendingStreams() {
LinkedHashSet<Stream> list = new LinkedHashSet<>();
list.addAll(requests);
list.addAll(responses);
return list;
}
protected void handleClosed() {
super.handleClosed();
long timerID = shutdownTimerID;
if (timerID != -1) {
shutdownTimerID = -1L;
vertx.cancelTimer(timerID);
}
closed = true;
if (metrics != null) {
HttpClientMetrics met = client.metrics();
met.endpointDisconnected(metrics);
}
WebSocketImpl ws;
VertxTracer tracer = context.tracer();
Iterable<Stream> streams;
synchronized (this) {
ws = webSocket;
streams = pendingStreams();
}
if (ws != null) {
ws.handleConnectionClosed();
}
for (Stream stream : streams) {
if (metrics != null) {
metrics.requestReset(stream.metric);
}
Object trace = stream.trace;
if (tracer != null && trace != null) {
tracer.receiveResponse(stream.context, null, trace, ConnectionBase.CLOSED_EXCEPTION, TagExtractor.empty());
}
stream.context.execute(null, v -> stream.handleClosed());
}
}
protected void handleIdle() {
synchronized (this) {
if (webSocket == null && responses.isEmpty() && requests.isEmpty()) {
return;
}
}
super.handleIdle();
}
@Override
protected void handleException(Throwable e) {
super.handleException(e);
WebSocketImpl ws;
Iterable<Stream> streams;
synchronized (this) {
ws = webSocket;
streams = pendingStreams();
}
if (ws != null) {
ws.handleException(e);
}
for (Stream stream : streams) {
stream.handleException(e);
}
}
@Override
public void createStream(ContextInternal context, Handler<AsyncResult<HttpClientStream>> handler) {
EventLoop eventLoop = context.nettyEventLoop();
if (eventLoop.inEventLoop()) {
StreamImpl stream;
synchronized (this) {
if (closed) {
stream = null;
} else {
stream = new StreamImpl(context, this, seq++);
requests.add(stream);
if (requests.size() == 1) {
stream.promise.complete(stream);
}
}
}
if (stream != null) {
stream.promise.future().onComplete(handler);
} else {
handler.handle(Future.failedFuture(CLOSED_EXCEPTION));
}
} else {
eventLoop.execute(() -> {
createStream(context, handler);
});
}
}
@Override
public boolean isValid() {
return expirationTimestamp == 0 || System.currentTimeMillis() <= expirationTimestamp;
}
private void recycle() {
if (shutdown) {
if (requests.isEmpty() && responses.isEmpty()) {
close();
}
} else if (!isConnect) {
expirationTimestamp = keepAliveTimeout == 0 ? 0L : System.currentTimeMillis() + keepAliveTimeout * 1000;
listener.onRecycle();
}
}
@Override
public void shutdown(long timeout, Handler<AsyncResult<Void>> handler) {
shutdown(timeout, vertx.promise(handler));
}
@Override
public Future<Void> shutdown(long timeoutMs) {
PromiseInternal<Void> promise = vertx.promise();
shutdown(timeoutMs, promise);
return promise.future();
}
private synchronized void shutdownNow() {
shutdownTimerID = -1L;
close();
}
private void shutdown(long timeoutMs, PromiseInternal<Void> promise) {
synchronized (this) {
if (shutdown) {
promise.fail("Already shutdown");
return;
}
shutdown = true;
closeFuture().onComplete(promise);
}
listener.onEvict();
synchronized (this) {
if (!closed) {
if (timeoutMs > 0L) {
shutdownTimerID = context.setTimer(timeoutMs, id -> shutdownNow());
} else {
close = true;
}
}
}
checkLifecycle();
}
}