package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpConnection;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import io.vertx.core.net.impl.SSLHelper;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.*;
import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static io.netty.handler.codec.http.HttpResponseStatus.CONTINUE;
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static io.netty.handler.codec.http.HttpResponseStatus.UPGRADE_REQUIRED;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED;
public class Http1xServerConnection extends Http1xConnectionBase<ServerWebSocketImpl> implements HttpServerConnection {
private static final Logger log = LoggerFactory.getLogger(Http1xServerConnection.class);
private final String serverOrigin;
private final SSLHelper sslHelper;
private boolean requestFailed;
private long bytesRead;
private long bytesWritten;
private HttpServerRequestImpl requestInProgress;
private HttpServerRequestImpl responseInProgress;
private boolean channelPaused;
private Handler<HttpServerRequest> requestHandler;
final HttpServerMetrics metrics;
final boolean handle100ContinueAutomatically;
final HttpServerOptions options;
public Http1xServerConnection(VertxInternal vertx,
SSLHelper sslHelper,
HttpServerOptions options,
ChannelHandlerContext channel,
ContextInternal context,
String serverOrigin,
HttpServerMetrics metrics) {
super(vertx, channel, context);
this.serverOrigin = serverOrigin;
this.options = options;
this.sslHelper = sslHelper;
this.metrics = metrics;
this.handle100ContinueAutomatically = options.isHandle100ContinueAutomatically();
}
@Override
public HttpServerConnection handler(Handler<HttpServerRequest> handler) {
requestHandler = handler;
return this;
}
@Override
public HttpServerMetrics metrics() {
return metrics;
}
public void handleMessage(Object msg) {
if (msg instanceof HttpRequest) {
DefaultHttpRequest request = (DefaultHttpRequest) msg;
if (request.decoderResult() != DecoderResult.SUCCESS) {
handleError(request);
return;
}
HttpServerRequestImpl req = new HttpServerRequestImpl(this, request);
synchronized (this) {
requestInProgress = req;
if (responseInProgress != null) {
req.pause();
responseInProgress.enqueue(req);
return;
}
responseInProgress = requestInProgress;
}
req.handleBegin(requestHandler);
} else if (msg == LastHttpContent.EMPTY_LAST_CONTENT) {
handleEnd();
} else if (msg instanceof HttpContent) {
handleContent(msg);
} else if (msg instanceof WebSocketFrame) {
handleWsFrame((WebSocketFrame) msg);
}
}
private void handleContent(Object msg) {
HttpContent content = (HttpContent) msg;
if (content.decoderResult() != DecoderResult.SUCCESS) {
handleError(content);
return;
}
Buffer buffer = Buffer.buffer(VertxHandler.safeBuffer(content.content(), chctx.alloc()));
HttpServerRequestImpl request;
synchronized (this) {
if (METRICS_ENABLED) {
reportBytesRead(buffer);
}
request = requestInProgress;
}
request.handleContent(buffer);
if (content instanceof LastHttpContent) {
handleEnd();
}
}
private void handleEnd() {
HttpServerRequestImpl request;
synchronized (this) {
if (METRICS_ENABLED) {
reportRequestComplete();
}
request = requestInProgress;
requestInProgress = null;
}
request.handleEnd();
}
synchronized void responseComplete() {
if (METRICS_ENABLED) {
reportResponseComplete();
}
HttpServerRequestImpl request = responseInProgress;
responseInProgress = null;
HttpServerRequestImpl next = request.next();
if (next != null) {
handleNext(next);
}
}
private void handleNext(HttpServerRequestImpl next) {
responseInProgress = next;
getContext().runOnContext(v -> {
next.resume();
next.handleBegin(requestHandler);
});
}
@Override
public void doPause() {
if (!channelPaused) {
channelPaused = true;
super.doPause();
}
}
@Override
public void doResume() {
if (channelPaused) {
channelPaused = false;
super.doResume();
}
}
private void reportBytesRead(Buffer buffer) {
if (metrics != null) {
bytesRead += buffer.length();
}
}
private void reportBytesWritten(Object msg) {
if (metrics != null) {
long bytes = getBytes(msg);
if (bytes == -1) {
log.warn("Metrics could not be updated to include bytes written because of unknown object " + msg.getClass() + " being written.");
} else {
bytesWritten += bytes;
}
}
}
private void reportRequestComplete() {
if (metrics != null) {
reportBytesRead(bytesRead);
bytesRead = 0;
}
}
private void reportResponseComplete() {
if (metrics != null) {
reportBytesWritten(bytesWritten);
if (requestFailed) {
metrics.requestReset(responseInProgress.metric());
requestFailed = false;
} else {
metrics.responseEnd(responseInProgress.metric(), responseInProgress.response());
}
bytesWritten = 0;
}
}
String getServerOrigin() {
return serverOrigin;
}
Vertx vertx() {
return vertx;
}
@Override
public void writeToChannel(Object msg, ChannelPromise promise) {
if (METRICS_ENABLED) {
reportBytesWritten(msg);
}
super.writeToChannel(msg, promise);
}
ServerWebSocketImpl createWebSocket(HttpServerRequestImpl request) {
if (ws != null) {
return ws;
}
if (!(request.nettyRequest() instanceof FullHttpRequest)) {
throw new IllegalStateException();
}
WebSocketServerHandshaker handshaker = createHandshaker(request);
if (handshaker == null) {
return null;
}
ws = new ServerWebSocketImpl(this, handshaker.version() != WebSocketVersion.V00,
request, handshaker, options.getMaxWebsocketFrameSize(), options.getMaxWebsocketMessageSize());
if (METRICS_ENABLED && metrics != null) {
ws.setMetric(metrics.connected(metric(), request.metric(), ws));
}
return ws;
}
private WebSocketServerHandshaker createHandshaker(HttpServerRequestImpl request) {
Channel ch = channel();
String connectionHeader = request.getHeader(io.vertx.core.http.HttpHeaders.CONNECTION);
if (connectionHeader == null || !connectionHeader.toLowerCase().contains("upgrade")) {
request.response()
.setStatusCode(BAD_REQUEST.code())
.end("\"Connection\" header must be \"Upgrade\".");
return null;
}
if (request.method() != io.vertx.core.http.HttpMethod.GET) {
request.response()
.setStatusCode(METHOD_NOT_ALLOWED.code())
.end();
return null;
}
String wsURL;
try {
wsURL = HttpUtils.getWebSocketLocation(request, isSsl());
} catch (Exception e) {
request.response()
.setStatusCode(BAD_REQUEST.code())
.end("Invalid request URI");
return null;
}
WebSocketServerHandshakerFactory factory =
new WebSocketServerHandshakerFactory(wsURL,
options.getWebsocketSubProtocols(),
options.getPerMessageWebsocketCompressionSupported() || options.getPerFrameWebsocketCompressionSupported(),
options.getMaxWebsocketFrameSize(), options.isAcceptUnmaskedFrames());
WebSocketServerHandshaker shake = factory.newHandshaker(request.nettyRequest());
if (shake == null) {
request.response()
.putHeader(HttpHeaderNames.SEC_WEBSOCKET_VERSION, WebSocketVersion.V13.toHttpHeaderValue())
.setStatusCode(UPGRADE_REQUIRED.code())
.end();
}
return shake;
}
NetSocket createNetSocket() {
Map<Channel, NetSocketImpl> connectionMap = new HashMap<>(1);
NetSocketImpl socket = new NetSocketImpl(vertx, chctx, context, sslHelper, metrics) {
@Override
protected void handleClosed() {
if (metrics != null) {
metrics.responseEnd(responseInProgress.metric(), responseInProgress.response());
}
connectionMap.remove(chctx.channel());
super.handleClosed();
}
@Override
public synchronized void handleMessage(Object msg) {
if (msg instanceof HttpContent) {
ReferenceCountUtil.release(msg);
return;
}
super.handleMessage(msg);
}
};
socket.metric(metric());
connectionMap.put(chctx.channel(), socket);
flush();
ChannelPipeline pipeline = chctx.pipeline();
ChannelHandler compressor = pipeline.get(HttpChunkContentCompressor.class);
if (compressor != null) {
pipeline.remove(compressor);
}
pipeline.remove("httpDecoder");
if (pipeline.get("chunkedWriter") != null) {
pipeline.remove("chunkedWriter");
}
chctx.pipeline().replace("handler", "handler", VertxHandler.create(socket));
chctx.pipeline().remove("httpEncoder");
return socket;
}
@Override
public synchronized void handleInterestedOpsChanged() {
if (!isNotWritable()) {
if (responseInProgress != null) {
responseInProgress.response().handleDrained();
} else if (ws != null) {
ws.handleDrained();
}
}
}
void write100Continue() {
chctx.writeAndFlush(new DefaultFullHttpResponse(HTTP_1_1, CONTINUE));
}
protected void handleClosed() {
HttpServerRequestImpl responseInProgress;
HttpServerRequestImpl requestInProgress;
ServerWebSocketImpl ws;
synchronized (this) {
ws = this.ws;
requestInProgress = this.requestInProgress;
responseInProgress = this.responseInProgress;
if (METRICS_ENABLED && metrics != null && ws != null) {
metrics.disconnected(ws.getMetric());
ws.setMetric(null);
}
}
if (requestInProgress != null) {
requestInProgress.handleException(CLOSED_EXCEPTION);
}
if (responseInProgress != null && responseInProgress != requestInProgress) {
responseInProgress.handleException(CLOSED_EXCEPTION);
}
if (ws != null) {
ws.handleClosed();
}
super.handleClosed();
}
@Override
protected void handleException(Throwable t) {
super.handleException(t);
HttpServerRequestImpl responseInProgress;
HttpServerRequestImpl requestInProgress;
ServerWebSocketImpl ws;
synchronized (this) {
ws = this.ws;
requestInProgress = this.requestInProgress;
responseInProgress = this.responseInProgress;
if (METRICS_ENABLED && metrics != null) {
requestFailed = true;
}
}
if (requestInProgress != null) {
requestInProgress.handleException(t);
}
if (responseInProgress != null && responseInProgress != requestInProgress) {
responseInProgress.handleException(t);
}
if (ws != null) {
ws.handleException(t);
}
}
protected void addFuture(Handler<AsyncResult<Void>> completionHandler, ChannelFuture future) {
super.addFuture(completionHandler, future);
}
@Override
protected boolean supportsFileRegion() {
return super.supportsFileRegion() && chctx.pipeline().get(HttpChunkContentCompressor.class) == null;
}
protected ChannelFuture sendFile(RandomAccessFile file, long offset, long length) throws IOException {
return super.sendFile(file, offset, length);
}
private void handleError(HttpObject obj) {
DecoderResult result = obj.decoderResult();
Throwable cause = result.cause();
if (cause instanceof TooLongFrameException) {
String causeMsg = cause.getMessage();
HttpVersion version;
if (obj instanceof HttpRequest) {
version = ((HttpRequest) obj).protocolVersion();
} else if (requestInProgress != null) {
version = requestInProgress.version() == io.vertx.core.http.HttpVersion.HTTP_1_0 ? HttpVersion.HTTP_1_0 : HttpVersion.HTTP_1_1;
} else {
version = HttpVersion.HTTP_1_1;
}
HttpResponseStatus status = causeMsg.startsWith("An HTTP line is larger than") ? HttpResponseStatus.REQUEST_URI_TOO_LONG : HttpResponseStatus.BAD_REQUEST;
DefaultFullHttpResponse resp = new DefaultFullHttpResponse(version, status);
ChannelPromise fut = chctx.newPromise();
writeToChannel(resp, fut);
fut.addListener(res -> {
fail(result.cause());
});
} else {
fail(result.cause());
}
}
private long getBytes(Object obj) {
if (obj == null) return 0;
if (obj instanceof Buffer) {
return ((Buffer) obj).length();
} else if (obj instanceof ByteBuf) {
return ((ByteBuf) obj).readableBytes();
} else if (obj instanceof HttpContent) {
return ((HttpContent) obj).content().readableBytes();
} else if (obj instanceof WebSocketFrame) {
return ((WebSocketFrame) obj).content().readableBytes();
} else if (obj instanceof FileRegion) {
return ((FileRegion) obj).count();
} else if (obj instanceof ChunkedFile) {
ChunkedFile file = (ChunkedFile) obj;
return file.endOffset() - file.startOffset();
} else {
return -1;
}
}
}