package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.compression.ZlibCodecFactory;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker;
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateServerExtensionHandshaker;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.*;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.impl.pool.ConnectionListener;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.NetSocketImpl;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.HttpClientMetrics;
import io.vertx.core.streams.impl.InboundBuffer;
import java.net.URI;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.vertx.core.http.HttpHeaders.*;
class Http1xClientConnection extends Http1xConnectionBase<WebSocketImpl> implements HttpClientConnection {
private static final Logger log = LoggerFactory.getLogger(Http1xClientConnection.class);
private final ConnectionListener<HttpClientConnection> listener;
private final HttpClientImpl client;
private final HttpClientOptions options;
private final boolean ssl;
private final SocketAddress server;
private final Object endpointMetric;
private final HttpClientMetrics metrics;
private final HttpVersion version;
private StreamImpl requestInProgress;
private StreamImpl responseInProgress;
private boolean close;
private boolean upgraded;
private int keepAliveTimeout;
private int seq = 1;
Http1xClientConnection(ConnectionListener<HttpClientConnection> listener,
HttpVersion version,
HttpClientImpl client,
Object endpointMetric,
ChannelHandlerContext channel,
boolean ssl,
SocketAddress server,
ContextInternal context,
HttpClientMetrics metrics) {
super(client.getVertx(), channel, context);
this.listener = listener;
this.client = client;
this.options = client.getOptions();
this.ssl = ssl;
this.server = server;
this.metrics = metrics;
this.version = version;
this.endpointMetric = endpointMetric;
this.keepAliveTimeout = options.getKeepAliveTimeout();
}
Object endpointMetric() {
return endpointMetric;
}
ConnectionListener<HttpClientConnection> listener() {
return listener;
}
private synchronized NetSocket upgrade(StreamImpl stream) {
if (options.isPipelining()) {
throw new IllegalStateException("Cannot upgrade a pipe-lined request");
}
if (upgraded) {
throw new IllegalStateException("Request already upgraded to NetSocket");
}
upgraded = true;
AtomicBoolean paused = new AtomicBoolean(false);
NetSocketImpl socket = new NetSocketImpl(vertx, chctx, context, client.getSslHelper(), metrics) {
{
super.pause();
}
@Override
public synchronized NetSocket handler(Handler<Buffer> dataHandler) {
return super.handler(dataHandler);
}
@Override
public synchronized NetSocket pause() {
paused.set(true);
return super.pause();
}
@Override
public synchronized NetSocket resume() {
paused.set(false);
return super.resume();
}
@Override
public synchronized void handleMessage(Object msg) {
if (msg instanceof HttpContent) {
if (msg instanceof LastHttpContent) {
stream.endResponse((LastHttpContent) msg);
}
ReferenceCountUtil.release(msg);
return;
}
super.handleMessage(msg);
}
@Override
protected void handleClosed() {
listener.onEvict();
super.handleClosed();
}
};
socket.metric(metric());
flush();
ChannelPipeline pipeline = chctx.pipeline();
ChannelHandler inflater = pipeline.get(HttpContentDecompressor.class);
if (inflater != null) {
pipeline.remove(inflater);
}
pipeline.replace("handler", "handler", VertxHandler.create(socket));
pipeline.remove("codec");
context.runOnContext(v -> {
if (!paused.get()) {
socket.resume();
}
});
return socket;
}
private static class StreamImpl implements HttpClientStream {
private final int id;
private final Http1xClientConnection conn;
private final Promise<HttpClientStream> fut;
private final InboundBuffer<Object> queue;
private HttpClientRequestImpl request;
private Handler<Void> continueHandler;
private HttpClientResponseImpl response;
private boolean requestEnded;
private boolean responseEnded;
private boolean reset;
private StreamImpl next;
private long bytesWritten;
private long bytesRead;
private Object metric;
StreamImpl(Http1xClientConnection conn, int id, Handler<AsyncResult<HttpClientStream>> handler) {
Promise<HttpClientStream> promise = Promise.promise();
promise.future().setHandler(handler);
this.conn = conn;
this.fut = promise;
this.id = id;
this.queue = new InboundBuffer<>(conn.context, 5);
}
private void append(StreamImpl s) {
StreamImpl c = this;
while (c.next != null) {
c = c.next;
}
c.next = s;
}
@Override
public int id() {
return id;
}
@Override
public Object metric() {
return metric;
}
@Override
public HttpVersion version() {
return conn.version;
}
@Override
public HttpClientConnection connection() {
return conn;
}
@Override
public Context getContext() {
return conn.context;
}
@Override
public void writeHead(HttpMethod method, String rawMethod, String uri, MultiMap headers, String hostHeader, boolean chunked, ByteBuf buf, boolean end, StreamPriority priority, Handler<Void> contHandler, Handler<AsyncResult<Void>> handler) {
HttpRequest request = createRequest(method, rawMethod, uri, headers);
prepareRequestHeaders(request, hostHeader, chunked);
if (buf != null) {
bytesWritten += buf.readableBytes();
}
continueHandler = contHandler;
sendRequest(request, buf, end, handler);
if (conn.responseInProgress == null) {
conn.responseInProgress = this;
} else {
conn.responseInProgress.append(this);
}
next = null;
}
private HttpRequest createRequest(HttpMethod method, String rawMethod, String uri, MultiMap headers) {
DefaultHttpRequest request = new DefaultHttpRequest(HttpUtils.toNettyHttpVersion(conn.version), HttpUtils.toNettyHttpMethod(method, rawMethod), uri, false);
if (headers != null) {
for (Map.Entry<String, String> header : headers) {
request.headers().add(header.getKey(), header.getValue());
}
}
return request;
}
private void prepareRequestHeaders(HttpRequest request, String hostHeader, boolean chunked) {
HttpHeaders headers = request.headers();
headers.remove(TRANSFER_ENCODING);
if (!headers.contains(HOST)) {
request.headers().set(HOST, hostHeader);
}
if (chunked) {
HttpUtil.setTransferEncodingChunked(request, true);
}
if (conn.options.isTryUseCompression() && request.headers().get(ACCEPT_ENCODING) == null) {
request.headers().set(ACCEPT_ENCODING, DEFLATE_GZIP);
}
if (!conn.options.isKeepAlive() && conn.options.getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_1) {
request.headers().set(CONNECTION, CLOSE);
} else if (conn.options.isKeepAlive() && conn.options.getProtocolVersion() == io.vertx.core.http.HttpVersion.HTTP_1_0) {
request.headers().set(CONNECTION, KEEP_ALIVE);
}
}
private void sendRequest(
HttpRequest request, ByteBuf buf, boolean end, Handler<AsyncResult<Void>> handler) {
if (end) {
if (buf != null) {
request = new AssembledFullHttpRequest(request, buf);
} else {
request = new AssembledFullHttpRequest(request);
}
} else {
if (buf != null) {
request = new AssembledHttpRequest(request, buf);
}
}
conn.writeToChannel(request, conn.toPromise(handler));
}
private boolean handleChunk(Buffer buff) {
bytesRead += buff.length();
return queue.write(buff);
}
@Override
public void writeBuffer(ByteBuf buff, boolean end, Handler<AsyncResult<Void>> handler) {
if (buff == null && !end) {
return;
}
HttpContent msg;
if (end) {
if (buff != null && buff.isReadable()) {
msg = new DefaultLastHttpContent(buff, false);
} else {
msg = LastHttpContent.EMPTY_LAST_CONTENT;
}
} else {
msg = new DefaultHttpContent(buff);
}
bytesWritten += msg.content().readableBytes();
conn.writeToChannel(msg, conn.toPromise(handler));
}
@Override
public void writeFrame(int type, int flags, ByteBuf payload) {
throw new IllegalStateException("Cannot write an HTTP/2 frame over an HTTP/1.x connection");
}
@Override
public void doSetWriteQueueMaxSize(int size) {
conn.doSetWriteQueueMaxSize(size);
}
@Override
public boolean isNotWritable() {
return conn.isNotWritable();
}
@Override
public void doPause() {
queue.pause();
}
@Override
public void doFetch(long amount) {
queue.fetch(amount);
}
@Override
public void reset(Throwable cause) {
synchronized (conn) {
if (reset) {
return;
}
reset = true;
}
handleException(cause);
synchronized (conn) {
if (conn.requestInProgress == this) {
if (request == null) {
conn.handleRequestEnd(true);
} else {
conn.close();
}
} else if (!responseEnded) {
conn.close();
} else {
}
}
}
@Override
public void beginRequest(HttpClientRequestImpl req) {
synchronized (conn) {
if (request != null) {
throw new IllegalStateException("Already writing a request");
}
if (conn.requestInProgress != this) {
throw new IllegalStateException("Connection is already writing another request");
}
request = req;
if (conn.metrics != null) {
metric = conn.metrics.requestBegin(conn.endpointMetric, conn.metric(), conn.localAddress(), conn.remoteAddress(), request);
}
}
}
public void endRequest() {
boolean doRecycle;
synchronized (conn) {
StreamImpl s = conn.requestInProgress;
if (s != this) {
throw new IllegalStateException("No write in progress");
}
if (requestEnded) {
throw new IllegalStateException("Request already sent");
}
requestEnded = true;
if (conn.metrics != null) {
conn.metrics.requestEnd(metric);
}
doRecycle = responseEnded;
}
conn.reportBytesWritten(bytesWritten);
conn.handleRequestEnd(doRecycle);
}
@Override
public NetSocket createNetSocket() {
synchronized (conn) {
if (responseEnded) {
throw new IllegalStateException("Response already ended");
}
return conn.upgrade(this);
}
}
@Override
public StreamPriority priority() {
return null;
}
@Override
public void updatePriority(StreamPriority streamPriority) {
}
private HttpClientResponseImpl beginResponse(HttpResponse resp) {
HttpVersion version;
if (resp.protocolVersion() == io.netty.handler.codec.http.HttpVersion.HTTP_1_0) {
version = io.vertx.core.http.HttpVersion.HTTP_1_0;
} else {
version = io.vertx.core.http.HttpVersion.HTTP_1_1;
}
response = new HttpClientResponseImpl(request, version, this, resp.status().code(), resp.status().reasonPhrase(), new HeadersAdaptor(resp.headers()));
if (conn.metrics != null) {
conn.metrics.responseBegin(metric, response);
}
if (resp.status().code() != 100 && request.method() != io.vertx.core.http.HttpMethod.CONNECT) {
String responseConnectionHeader = resp.headers().get(HttpHeaderNames.CONNECTION);
io.netty.handler.codec.http.HttpVersion protocolVersion = resp.protocolVersion();
String requestConnectionHeader = request.headers().get(HttpHeaderNames.CONNECTION);
if (HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(responseConnectionHeader) || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(requestConnectionHeader)) {
conn.close = true;
} else if (protocolVersion == io.netty.handler.codec.http.HttpVersion.HTTP_1_0 && !HttpHeaderValues.KEEP_ALIVE.contentEqualsIgnoreCase(responseConnectionHeader)) {
conn.close = true;
}
String keepAliveHeader = resp.headers().get(HttpHeaderNames.KEEP_ALIVE);
if (keepAliveHeader != null) {
int timeout = HttpUtils.parseKeepAliveHeaderTimeout(keepAliveHeader);
if (timeout != -1) {
conn.keepAliveTimeout = timeout;
}
}
}
queue.handler(item -> {
if (item instanceof MultiMap) {
conn.reportBytesRead(bytesRead);
response.handleEnd((MultiMap) item);
} else {
response.handleChunk((Buffer) item);
}
});
queue.drainHandler(v -> {
if (!responseEnded) {
conn.doResume();
}
});
return response;
}
private boolean endResponse(LastHttpContent trailer) {
synchronized (conn) {
if (conn.metrics != null) {
conn.metrics.responseEnd(metric, response);
}
}
queue.write(new HeadersAdaptor(trailer.trailingHeaders()));
synchronized (conn) {
responseEnded = true;
conn.close |= !conn.options.isKeepAlive();
conn.doResume();
return requestEnded;
}
}
void handleException(Throwable cause) {
HttpClientRequestImpl request;
HttpClientResponseImpl response;
Promise<HttpClientStream> fut;
boolean requestEnded;
synchronized (conn) {
request = this.request;
response = this.response;
fut = this.fut;
requestEnded = this.requestEnded;
}
if (request != null) {
if (response == null) {
request.handleException(cause);
} else {
if (!requestEnded) {
request.handleException(cause);
}
response.handleException(cause);
}
} else {
fut.tryFail(cause);
}
}
}
private void checkLifecycle() {
if (upgraded) {
} else if (close) {
close();
} else {
recycle();
}
}
private Throwable validateMessage(Object msg) {
if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg;
DecoderResult result = obj.decoderResult();
if (result.isFailure()) {
return result.cause();
} else if (obj instanceof HttpResponse) {
io.netty.handler.codec.http.HttpVersion version = ((HttpResponse) obj).protocolVersion();
if (version != io.netty.handler.codec.http.HttpVersion.HTTP_1_0 && version != io.netty.handler.codec.http.HttpVersion.HTTP_1_1) {
return new IllegalStateException("Unsupported HTTP version: " + version);
}
}
}
return null;
}
public void handleMessage(Object msg) {
Throwable error = validateMessage(msg);
if (error != null) {
fail(error);
} else if (msg instanceof HttpObject) {
HttpObject obj = (HttpObject) msg;
handleHttpMessage(obj);
} else if (msg instanceof WebSocketFrame) {
handleWsFrame((WebSocketFrame) msg);
} else {
throw new IllegalStateException("Invalid object " + msg);
}
}
private void handleHttpMessage(HttpObject obj) {
if (obj instanceof HttpResponse) {
handleResponseBegin((HttpResponse) obj);
} else if (obj instanceof HttpContent) {
HttpContent chunk = (HttpContent) obj;
if (chunk.content().isReadable()) {
Buffer buff = Buffer.buffer(VertxHandler.safeBuffer(chunk.content(), chctx.alloc()));
handleResponseChunk(buff);
}
if (chunk instanceof LastHttpContent) {
handleResponseEnd((LastHttpContent) chunk);
}
}
}
private void handleResponseBegin(HttpResponse resp) {
if (resp.status().code() == 100) {
Handler<Void> handler;
synchronized (this) {
StreamImpl stream = responseInProgress;
handler = stream.continueHandler;
}
if (handler != null) {
handler.handle(null);
}
} else {
StreamImpl stream;
HttpClientResponseImpl response;
HttpClientRequestImpl request;
synchronized (this) {
stream = responseInProgress;
request = stream.request;
response = stream.beginResponse(resp);
}
request.handleResponse(response);
}
}
private void handleResponseChunk(Buffer buff) {
StreamImpl resp;
synchronized (this) {
resp = responseInProgress;
}
if (resp != null) {
if (!resp.handleChunk(buff)) {
doPause();
}
}
}
private void handleResponseEnd(LastHttpContent trailer) {
StreamImpl stream;
synchronized (this) {
stream = responseInProgress;
if (stream.response == null) {
return;
}
responseInProgress = stream.next;
}
if (stream.endResponse(trailer)) {
checkLifecycle();
}
}
private void handleRequestEnd(boolean recycle) {
StreamImpl next;
synchronized (this) {
next = requestInProgress.next;
requestInProgress = next;
}
if (recycle) {
recycle();
}
if (next != null) {
next.fut.complete(next);
}
}
public HttpClientMetrics metrics() {
return metrics;
}
synchronized void toWebSocket(String requestURI,
MultiMap headers,
WebsocketVersion vers,
List<String> subProtocols,
int maxWebSocketFrameSize,
Handler<AsyncResult<WebSocket>> wsHandler) {
if (ws != null) {
throw new IllegalStateException("Already websocket");
}
try {
URI wsuri = new URI(requestURI);
if (!wsuri.isAbsolute()) {
wsuri = new URI((ssl ? "https:" : "http:") + "//" + server.host() + ":" + server.port() + requestURI);
}
WebSocketVersion version =
WebSocketVersion.valueOf((vers == null ?
WebSocketVersion.V13 : vers).toString());
HttpHeaders nettyHeaders;
if (headers != null) {
nettyHeaders = new DefaultHttpHeaders();
for (Map.Entry<String, String> entry: headers) {
nettyHeaders.add(entry.getKey(), entry.getValue());
}
} else {
nettyHeaders = null;
}
ChannelPipeline p = chctx.channel().pipeline();
ArrayList<WebSocketClientExtensionHandshaker> extensionHandshakers = initializeWebsocketExtensionHandshakers(client.getOptions());
if (!extensionHandshakers.isEmpty()) {
p.addBefore("handler", "websocketsExtensionsHandler", new WebSocketClientExtensionHandler(
extensionHandshakers.toArray(new WebSocketClientExtensionHandshaker[0])));
}
String subp = null;
if (subProtocols != null) {
subp = String.join(",", subProtocols);
}
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
wsuri,
version,
subp,
!extensionHandshakers.isEmpty(),
nettyHeaders,
maxWebSocketFrameSize,
!options.isSendUnmaskedFrames(),
false);
WebSocketHandshakeInboundHandler handshakeInboundHandler = new WebSocketHandshakeInboundHandler(handshaker, ar -> {
AsyncResult<WebSocket> wsRes = ar.map(v -> {
WebSocketImpl w = new WebSocketImpl(Http1xClientConnection.this, version != WebSocketVersion.V00,
options.getMaxWebsocketFrameSize(),
options.getMaxWebsocketMessageSize());
w.subProtocol(handshaker.actualSubprotocol());
return w;
});
if (ar.failed()) {
close();
} else {
ws = (WebSocketImpl) wsRes.result();
ws.registerHandler(vertx.eventBus());
}
getContext().executeFromIO(wsRes, res -> {
if (res.succeeded()) {
log.debug("WebSocket handshake complete");
if (metrics != null) {
ws.setMetric(metrics.connected(endpointMetric, metric(), ws));
}
}
wsHandler.handle(res);
});
});
p.addBefore("handler", "handshakeCompleter", handshakeInboundHandler);
handshaker
.handshake(chctx.channel())
.addListener(f -> {
if (!f.isSuccess()) {
wsHandler.handle(Future.failedFuture(f.cause()));
}
});
} catch (Exception e) {
handleException(e);
}
}
ArrayList<WebSocketClientExtensionHandshaker> initializeWebsocketExtensionHandshakers (HttpClientOptions options) {
ArrayList<WebSocketClientExtensionHandshaker> extensionHandshakers = new ArrayList<WebSocketClientExtensionHandshaker>();
if (options.getTryWebsocketDeflateFrameCompression()) {
extensionHandshakers.add(new DeflateFrameClientExtensionHandshaker(options.getWebsocketCompressionLevel(),
false));
}
if (options.getTryUsePerMessageWebsocketCompression()) {
extensionHandshakers.add(new PerMessageDeflateClientExtensionHandshaker(options.getWebsocketCompressionLevel(),
ZlibCodecFactory.isSupportingWindowSizeAndMemLevel(), PerMessageDeflateServerExtensionHandshaker.MAX_WINDOW_SIZE,
options.getWebsocketCompressionAllowClientNoContext(), options.getWebsocketCompressionRequestServerNoContext()));
}
return extensionHandshakers;
}
@Override
public synchronized void handleInterestedOpsChanged() {
if (!isNotWritable()) {
StreamImpl current = requestInProgress;
if (current != null) {
current.request.handleDrained();
} else if (ws != null) {
ws.handleDrained();
}
}
}
protected void handleClosed() {
super.handleClosed();
if (metrics != null) {
metrics.endpointDisconnected(endpointMetric, metric());
}
WebSocketImpl ws;
List<StreamImpl> list = Collections.emptyList();
synchronized (this) {
ws = this.ws;
for (StreamImpl r = responseInProgress;r != null;r = r.next) {
if (metrics != null) {
metrics.requestReset(r.metric);
}
if (list.isEmpty()) {
list = new ArrayList<>();
}
list.add(r);
}
}
if (ws != null) {
ws.handleClosed();
}
for (StreamImpl stream : list) {
stream.handleException(CLOSED_EXCEPTION);
}
}
protected void handleIdle() {
synchronized (this) {
if (ws == null && responseInProgress == null) {
return;
}
}
super.handleIdle();
}
@Override
protected synchronized void handleException(Throwable e) {
super.handleException(e);
if (ws != null) {
ws.handleException(e);
} else {
for (StreamImpl r = responseInProgress;r != null;r = r.next) {
r.handleException(e);
}
}
}
@Override
public void createStream(Handler<AsyncResult<HttpClientStream>> handler) {
StreamImpl stream;
synchronized (this) {
stream = new StreamImpl(this, seq++, handler);
if (requestInProgress != null) {
requestInProgress.append(stream);
return;
}
requestInProgress = stream;
}
stream.fut.complete(stream);
}
private void recycle() {
long expiration = keepAliveTimeout == 0 ? 0L : System.currentTimeMillis() + keepAliveTimeout * 1000;
listener.onRecycle(expiration);
}
}