package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.*;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.impl.headers.VertxHttpHeaders;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.spi.metrics.Metrics;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
public class HttpServerResponseImpl implements HttpServerResponse {
private static final Buffer EMPTY_BUFFER = Buffer.buffer(Unpooled.EMPTY_BUFFER);
private static final Logger log = LoggerFactory.getLogger(HttpServerResponseImpl.class);
private final VertxInternal vertx;
private final Http1xServerConnection conn;
private HttpResponseStatus status;
private final HttpVersion version;
private final boolean keepAlive;
private final boolean head;
private final Object requestMetric;
private boolean headWritten;
private boolean written;
private Handler<Void> drainHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> closeHandler;
private Handler<Void> endHandler;
private Handler<Void> headersEndHandler;
private Handler<Void> bodyEndHandler;
private boolean closed;
private final VertxHttpHeaders ;
private MultiMap trailers;
private io.netty.handler.codec.http.HttpHeaders = EmptyHttpHeaders.INSTANCE;
private String statusMessage;
private long bytesWritten;
private NetSocket netSocket;
HttpServerResponseImpl(final VertxInternal vertx, Http1xServerConnection conn, HttpRequest request, Object requestMetric) {
this.vertx = vertx;
this.conn = conn;
this.version = request.protocolVersion();
this.headers = new VertxHttpHeaders();
this.status = HttpResponseStatus.OK;
this.requestMetric = requestMetric;
this.keepAlive = (version == HttpVersion.HTTP_1_1 && !request.headers().contains(io.vertx.core.http.HttpHeaders.CONNECTION, HttpHeaders.CLOSE, true))
|| (version == HttpVersion.HTTP_1_0 && request.headers().contains(io.vertx.core.http.HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE, true));
this.head = request.method() == io.netty.handler.codec.http.HttpMethod.HEAD;
}
@Override
public MultiMap () {
return headers;
}
@Override
public MultiMap trailers() {
if (trailers == null) {
VertxHttpHeaders v = new VertxHttpHeaders();
trailers = v;
trailingHeaders = v;
}
return trailers;
}
@Override
public int getStatusCode() {
return status.code();
}
@Override
public HttpServerResponse setStatusCode(int statusCode) {
status = statusMessage != null ? new HttpResponseStatus(statusCode, statusMessage) : HttpResponseStatus.valueOf(statusCode);
return this;
}
@Override
public String getStatusMessage() {
return status.reasonPhrase();
}
@Override
public HttpServerResponse setStatusMessage(String statusMessage) {
synchronized (conn) {
this.statusMessage = statusMessage;
this.status = new HttpResponseStatus(status.code(), statusMessage);
return this;
}
}
@Override
public HttpServerResponseImpl setChunked(boolean chunked) {
synchronized (conn) {
checkValid();
if (version != HttpVersion.HTTP_1_0) {
headers.set(HttpHeaders.TRANSFER_ENCODING, chunked ? "chunked" : null);
}
return this;
}
}
@Override
public boolean isChunked() {
synchronized (conn) {
return headers.contains(HttpHeaders.TRANSFER_ENCODING, HttpHeaders.CHUNKED, true);
}
}
@Override
public HttpServerResponseImpl (String key, String value) {
synchronized (conn) {
checkValid();
headers.set(key, value);
return this;
}
}
@Override
public HttpServerResponseImpl (String key, Iterable<String> values) {
synchronized (conn) {
checkValid();
headers.set(key, values);
return this;
}
}
@Override
public HttpServerResponseImpl putTrailer(String key, String value) {
synchronized (conn) {
checkValid();
trailers().set(key, value);
return this;
}
}
@Override
public HttpServerResponseImpl putTrailer(String key, Iterable<String> values) {
synchronized (conn) {
checkValid();
trailers().set(key, values);
return this;
}
}
@Override
public HttpServerResponse (CharSequence name, CharSequence value) {
synchronized (conn) {
checkValid();
headers.set(name, value);
return this;
}
}
@Override
public HttpServerResponse (CharSequence name, Iterable<CharSequence> values) {
synchronized (conn) {
checkValid();
headers.set(name, values);
return this;
}
}
@Override
public HttpServerResponse putTrailer(CharSequence name, CharSequence value) {
synchronized (conn) {
checkValid();
trailers().set(name, value);
return this;
}
}
@Override
public HttpServerResponse putTrailer(CharSequence name, Iterable<CharSequence> value) {
synchronized (conn) {
checkValid();
trailers().set(name, value);
return this;
}
}
@Override
public HttpServerResponse setWriteQueueMaxSize(int size) {
synchronized (conn) {
checkValid();
conn.doSetWriteQueueMaxSize(size);
return this;
}
}
@Override
public boolean writeQueueFull() {
synchronized (conn) {
checkValid();
return conn.isNotWritable();
}
}
@Override
public HttpServerResponse drainHandler(Handler<Void> handler) {
synchronized (conn) {
if (handler != null) {
checkValid();
}
drainHandler = handler;
conn.getContext().runOnContext(v -> conn.handleInterestedOpsChanged());
return this;
}
}
@Override
public HttpServerResponse exceptionHandler(Handler<Throwable> handler) {
synchronized (conn) {
if (handler != null) {
checkValid();
}
exceptionHandler = handler;
return this;
}
}
@Override
public HttpServerResponse closeHandler(Handler<Void> handler) {
synchronized (conn) {
if (handler != null) {
checkValid();
}
closeHandler = handler;
return this;
}
}
@Override
public HttpServerResponse endHandler(@Nullable Handler<Void> handler) {
synchronized (conn) {
if (handler != null) {
checkValid();
}
endHandler = handler;
return this;
}
}
@Override
public HttpServerResponseImpl write(Buffer chunk) {
ByteBuf buf = chunk.getByteBuf();
return write(buf, conn.voidPromise);
}
@Override
public HttpServerResponse write(Buffer chunk, Handler<AsyncResult<Void>> handler) {
return write(chunk.getByteBuf(), conn.toPromise(handler));
}
@Override
public HttpServerResponseImpl write(String chunk, String enc) {
return write(Buffer.buffer(chunk, enc).getByteBuf(), conn.voidPromise);
}
@Override
public HttpServerResponse write(String chunk, String enc, Handler<AsyncResult<Void>> handler) {
return write(Buffer.buffer(chunk, enc).getByteBuf(), conn.toPromise(handler));
}
@Override
public HttpServerResponseImpl write(String chunk) {
return write(Buffer.buffer(chunk).getByteBuf(), conn.voidPromise);
}
@Override
public HttpServerResponse write(String chunk, Handler<AsyncResult<Void>> handler) {
return write(Buffer.buffer(chunk).getByteBuf(), conn.toPromise(handler));
}
@Override
public HttpServerResponse writeContinue() {
conn.write100Continue();
return this;
}
@Override
public void end(String chunk) {
end(Buffer.buffer(chunk));
}
@Override
public void end(String chunk, Handler<AsyncResult<Void>> handler) {
end(Buffer.buffer(chunk), handler);
}
@Override
public void end(String chunk, String enc) {
end(Buffer.buffer(chunk, enc));
}
@Override
public void end(String chunk, String enc, Handler<AsyncResult<Void>> handler) {
end(Buffer.buffer(chunk, enc), handler);
}
@Override
public void end(Buffer chunk) {
end(chunk, conn.voidPromise);
}
@Override
public void end(Buffer chunk, Handler<AsyncResult<Void>> handler) {
end(chunk, conn.toPromise(handler));
}
private void end(Buffer chunk, ChannelPromise promise) {
synchronized (conn) {
if (written) {
throw new IllegalStateException("Response has already been written");
}
ByteBuf data = chunk.getByteBuf();
bytesWritten += data.readableBytes();
HttpObject msg;
if (!headWritten) {
prepareHeaders(bytesWritten);
msg = new AssembledFullHttpResponse(head, version, status, headers, data, trailingHeaders);
} else {
msg = new AssembledLastHttpContent(data, trailingHeaders);
}
conn.writeToChannel(msg, promise);
if (!keepAlive) {
closeConnAfterWrite();
closed = true;
}
written = true;
conn.responseComplete();
if (bodyEndHandler != null) {
bodyEndHandler.handle(null);
}
if (!closed && endHandler != null) {
endHandler.handle(null);
}
}
}
@Override
public void close() {
synchronized (conn) {
if (!closed) {
if (headWritten) {
closeConnAfterWrite();
} else {
conn.close();
}
closed = true;
}
}
}
@Override
public void end() {
end(EMPTY_BUFFER);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
end(EMPTY_BUFFER, handler);
}
@Override
public HttpServerResponseImpl sendFile(String filename, long offset, long length) {
doSendFile(filename, offset, length, null);
return this;
}
@Override
public HttpServerResponse sendFile(String filename, long start, long end, Handler<AsyncResult<Void>> resultHandler) {
doSendFile(filename, start, end, resultHandler);
return this;
}
@Override
public boolean ended() {
synchronized (conn) {
return written;
}
}
@Override
public boolean closed() {
synchronized (conn) {
return closed;
}
}
@Override
public boolean headWritten() {
synchronized (conn) {
return headWritten;
}
}
@Override
public long bytesWritten() {
synchronized (conn) {
return bytesWritten;
}
}
@Override
public HttpServerResponse headersEndHandler(Handler<Void> handler) {
synchronized (conn) {
this.headersEndHandler = handler;
return this;
}
}
@Override
public HttpServerResponse bodyEndHandler(Handler<Void> handler) {
synchronized (conn) {
this.bodyEndHandler = handler;
return this;
}
}
private void doSendFile(String filename, long offset, long length, Handler<AsyncResult<Void>> resultHandler) {
synchronized (conn) {
if (headWritten) {
throw new IllegalStateException("Head already written");
}
checkValid();
File file = vertx.resolveFile(filename);
if (!file.exists()) {
if (resultHandler != null) {
ContextInternal ctx = vertx.getOrCreateContext();
ctx.runOnContext((v) -> resultHandler.handle(Future.failedFuture(new FileNotFoundException())));
} else {
log.error("File not found: " + filename);
}
return;
}
long contentLength = Math.min(length, file.length() - offset);
bytesWritten = contentLength;
if (!headers.contains(HttpHeaders.CONTENT_TYPE)) {
String contentType = MimeMapping.getMimeTypeForFilename(filename);
if (contentType != null) {
headers.set(HttpHeaders.CONTENT_TYPE, contentType);
}
}
prepareHeaders(bytesWritten);
ChannelFuture channelFuture;
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(file, "r");
conn.writeToChannel(new AssembledHttpResponse(head, version, status, headers));
channelFuture = conn.sendFile(raf, Math.min(offset, file.length()), contentLength);
} catch (IOException e) {
try {
if (raf != null) {
raf.close();
}
} catch (IOException ignore) {
}
if (resultHandler != null) {
ContextInternal ctx = vertx.getOrCreateContext();
ctx.runOnContext((v) -> resultHandler.handle(Future.failedFuture(e)));
} else {
log.error("Failed to send file", e);
}
return;
}
written = true;
ContextInternal ctx = vertx.getOrCreateContext();
channelFuture.addListener(future -> {
if (future.isSuccess()) {
ChannelPromise pr = conn.channelHandlerContext().newPromise();
conn.writeToChannel(LastHttpContent.EMPTY_LAST_CONTENT, pr);
if (!keepAlive) {
pr.addListener(a -> {
closeConnAfterWrite();
});
}
}
if (resultHandler != null) {
AsyncResult<Void> res;
if (future.isSuccess()) {
res = Future.succeededFuture();
} else {
res = Future.failedFuture(future.cause());
}
ctx.executeFromIO(v -> resultHandler.handle(res));
}
Handler<Void> handler;
synchronized (conn) {
handler = bodyEndHandler;
}
if (handler != null) {
ctx.executeFromIO(v -> {
handler.handle(null);
});
}
conn.responseComplete();
});
}
}
private void closeConnAfterWrite() {
ChannelPromise channelFuture = conn.channelFuture();
conn.writeToChannel(Unpooled.EMPTY_BUFFER, channelFuture);
channelFuture.addListener(fut -> conn.close());
}
void handleDrained() {
synchronized (conn) {
if (drainHandler != null) {
drainHandler.handle(null);
}
}
}
void handleException(Throwable t) {
if (t == Http1xServerConnection.CLOSED_EXCEPTION) {
handleClosed();
} else {
Handler<Throwable> handler;
synchronized (conn) {
handler = exceptionHandler;
}
if (handler != null) {
handler.handle(t);
}
}
}
private void handleClosed() {
Handler<Void> closedHandler;
Handler<Void> endHandler;
Handler<Throwable> exceptionHandler;
synchronized (conn) {
if (closed) {
return;
}
closed = true;
exceptionHandler = written ? null : this.exceptionHandler;
endHandler = this.written ? null : this.endHandler;
closedHandler = this.closeHandler;
}
if (exceptionHandler != null) {
exceptionHandler.handle(ConnectionBase.CLOSED_EXCEPTION);
}
if (endHandler != null) {
endHandler.handle(null);
}
if (closedHandler != null) {
closedHandler.handle(null);
}
}
private void checkValid() {
if (written) {
throw new IllegalStateException("Response has already been written");
}
if (closed) {
throw new IllegalStateException("Response is closed");
}
}
private void (long contentLength) {
if (version == HttpVersion.HTTP_1_0 && keepAlive) {
headers.set(HttpHeaders.CONNECTION, HttpHeaders.KEEP_ALIVE);
} else if (version == HttpVersion.HTTP_1_1 && !keepAlive) {
headers.set(HttpHeaders.CONNECTION, HttpHeaders.CLOSE);
}
if (head || status == HttpResponseStatus.NOT_MODIFIED) {
headers.remove(HttpHeaders.TRANSFER_ENCODING);
} else {
if (!headers.contains(HttpHeaders.TRANSFER_ENCODING) && !headers.contains(HttpHeaders.CONTENT_LENGTH) && contentLength >= 0) {
String value = contentLength == 0 ? "0" : String.valueOf(contentLength);
headers.set(HttpHeaders.CONTENT_LENGTH, value);
}
}
if (headersEndHandler != null) {
headersEndHandler.handle(null);
}
if (Metrics.METRICS_ENABLED) {
reportResponseBegin();
}
headWritten = true;
}
private void reportResponseBegin() {
if (conn.metrics != null) {
conn.metrics.responseBegin(requestMetric, this);
}
}
private HttpServerResponseImpl write(ByteBuf chunk, ChannelPromise promise) {
synchronized (conn) {
if (written) {
throw new IllegalStateException("Response has already been written");
}
else if (!headWritten && !headers.contains(HttpHeaders.TRANSFER_ENCODING) && !headers.contains(HttpHeaders.CONTENT_LENGTH)) {
if (version != HttpVersion.HTTP_1_0) {
throw new IllegalStateException("You must set the Content-Length header to be the total size of the message "
+ "body BEFORE sending any data if you are not using HTTP chunked encoding.");
}
}
bytesWritten += chunk.readableBytes();
HttpObject msg;
if (!headWritten) {
prepareHeaders(-1);
msg = new AssembledHttpResponse(head, version, status, headers, chunk);
} else {
msg = new DefaultHttpContent(chunk);
}
conn.writeToChannel(msg, promise);
return this;
}
}
NetSocket netSocket(boolean isConnect) {
checkValid();
if (netSocket == null) {
if (isConnect) {
if (headWritten) {
throw new IllegalStateException("Response for CONNECT already sent");
}
status = HttpResponseStatus.OK;
prepareHeaders(-1);
conn.writeToChannel(new AssembledHttpResponse(head, version, status, headers));
}
written = true;
netSocket = conn.createNetSocket();
}
return netSocket;
}
@Override
public int streamId() {
return -1;
}
@Override
public void reset(long code) {
}
@Override
public HttpServerResponse push(HttpMethod method, String path, MultiMap headers, Handler<AsyncResult<HttpServerResponse>> handler) {
return push(method, null, path, headers, handler);
}
@Override
public HttpServerResponse push(io.vertx.core.http.HttpMethod method, String host, String path, Handler<AsyncResult<HttpServerResponse>> handler) {
return push(method, path, handler);
}
@Override
public HttpServerResponse push(HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler) {
return push(method, path, null, null, handler);
}
@Override
public HttpServerResponse push(HttpMethod method, String host, String path, MultiMap headers, Handler<AsyncResult<HttpServerResponse>> handler) {
handler.handle(Future.failedFuture("Push promise is only supported with HTTP2"));
return this;
}
@Override
public HttpServerResponse writeCustomFrame(int type, int flags, Buffer payload) {
return this;
}
}