package io.vertx.core.http.impl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpStatusClass;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2Headers;
import io.vertx.codegen.annotations.Nullable;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.StreamResetException;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.ConnectionBase;
import io.vertx.core.spi.metrics.Metrics;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import static io.vertx.core.spi.metrics.Metrics.METRICS_ENABLED;
public class Http2ServerResponseImpl implements HttpServerResponse {
private static final Logger log = LoggerFactory.getLogger(Http2ServerResponseImpl.class);
private final Http2ServerStream stream;
private final ChannelHandlerContext ctx;
private final Http2ServerConnection conn;
private final boolean head;
private final boolean push;
private final String host;
private Http2Headers = new DefaultHttp2Headers();
private Http2HeadersAdaptor ;
private Http2Headers trailers;
private Http2HeadersAdaptor trailedMap;
private boolean chunked;
private boolean headWritten;
private boolean ended;
private boolean closed;
private HttpResponseStatus status = HttpResponseStatus.OK;
private String statusMessage;
private Handler<Void> drainHandler;
private Handler<Throwable> exceptionHandler;
private Handler<Void> headersEndHandler;
private Handler<Void> bodyEndHandler;
private Handler<Void> closeHandler;
private Handler<Void> endHandler;
private NetSocket netSocket;
public Http2ServerResponseImpl(Http2ServerConnection conn,
Http2ServerStream stream,
HttpMethod method,
boolean push,
String contentEncoding,
String host) {
this.stream = stream;
this.ctx = conn.handlerContext;
this.conn = conn;
this.head = method == HttpMethod.HEAD;
this.push = push;
this.host = host;
if (contentEncoding != null) {
putHeader(HttpHeaderNames.CONTENT_ENCODING, contentEncoding);
}
}
boolean isPush() {
return push;
}
void handleReset(long code) {
handleException(new StreamResetException(code));
}
void handleException(Throwable cause) {
Handler<Throwable> handler;
synchronized (conn) {
handler = exceptionHandler;
}
if (handler != null) {
handler.handle(cause);
}
}
void handleClose() {
Handler<Throwable> exceptionHandler;
Handler<Void> endHandler;
Handler<Void> closeHandler;
synchronized (conn) {
boolean failed = !ended;
closed = true;
exceptionHandler = failed ? this.exceptionHandler : null;
endHandler = failed ? this.endHandler : null;
closeHandler = this.closeHandler;
}
if (exceptionHandler != null) {
exceptionHandler.handle(ConnectionBase.CLOSED_EXCEPTION);
}
if (endHandler != null) {
endHandler.handle(null);
}
if (closeHandler != null) {
closeHandler.handle(null);
}
}
private void checkHeadWritten() {
if (headWritten) {
throw new IllegalStateException("Header already sent");
}
}
@Override
public HttpServerResponse exceptionHandler(Handler<Throwable> handler) {
synchronized (conn) {
if (handler != null) {
checkValid();
}
exceptionHandler = handler;
return this;
}
}
@Override
public int getStatusCode() {
synchronized (conn) {
return status.code();
}
}
@Override
public HttpServerResponse setStatusCode(int statusCode) {
if (statusCode < 0) {
throw new IllegalArgumentException("code: " + statusCode + " (expected: 0+)");
}
synchronized (conn) {
checkHeadWritten();
this.status = HttpResponseStatus.valueOf(statusCode);
return this;
}
}
@Override
public String getStatusMessage() {
synchronized (conn) {
if (statusMessage == null) {
return status.reasonPhrase();
}
return statusMessage;
}
}
@Override
public HttpServerResponse setStatusMessage(String statusMessage) {
synchronized (conn) {
checkHeadWritten();
this.statusMessage = statusMessage;
return this;
}
}
@Override
public HttpServerResponse setChunked(boolean chunked) {
synchronized (conn) {
checkHeadWritten();
this.chunked = true;
return this;
}
}
@Override
public boolean isChunked() {
synchronized (conn) {
return chunked;
}
}
@Override
public MultiMap () {
synchronized (conn) {
if (headersMap == null) {
headersMap = new Http2HeadersAdaptor(headers);
}
return headersMap;
}
}
@Override
public HttpServerResponse (String name, String value) {
synchronized (conn) {
checkHeadWritten();
headers().set(name, value);
return this;
}
}
@Override
public HttpServerResponse (CharSequence name, CharSequence value) {
synchronized (conn) {
checkHeadWritten();
headers().set(name, value);
return this;
}
}
@Override
public HttpServerResponse (String name, Iterable<String> values) {
synchronized (conn) {
checkHeadWritten();
headers().set(name, values);
return this;
}
}
@Override
public HttpServerResponse (CharSequence name, Iterable<CharSequence> values) {
synchronized (conn) {
checkHeadWritten();
headers().set(name, values);
return this;
}
}
@Override
public MultiMap trailers() {
synchronized (conn) {
if (trailedMap == null) {
trailedMap = new Http2HeadersAdaptor(trailers = new DefaultHttp2Headers());
}
return trailedMap;
}
}
@Override
public HttpServerResponse putTrailer(String name, String value) {
synchronized (conn) {
checkValid();
trailers().set(name, value);
return this;
}
}
@Override
public HttpServerResponse putTrailer(CharSequence name, CharSequence value) {
synchronized (conn) {
checkValid();
trailers().set(name, value);
return this;
}
}
@Override
public HttpServerResponse putTrailer(String name, Iterable<String> values) {
synchronized (conn) {
checkValid();
trailers().set(name, values);
return this;
}
}
@Override
public HttpServerResponse putTrailer(CharSequence name, Iterable<CharSequence> value) {
synchronized (conn) {
checkValid();
trailers().set(name, value);
return this;
}
}
@Override
public HttpServerResponse closeHandler(Handler<Void> handler) {
synchronized (conn) {
if (handler != null) {
checkValid();
}
closeHandler = handler;
return this;
}
}
@Override
public HttpServerResponse endHandler(@Nullable Handler<Void> handler) {
synchronized (conn) {
if (handler != null) {
checkValid();
}
endHandler = handler;
return this;
}
}
@Override
public HttpServerResponse writeContinue() {
synchronized (conn) {
checkHeadWritten();
stream.writeHeaders(new DefaultHttp2Headers().status("100"), false, null);
ctx.flush();
return this;
}
}
@Override
public HttpServerResponse write(Buffer chunk) {
ByteBuf buf = chunk.getByteBuf();
write(buf, false, null);
return this;
}
@Override
public HttpServerResponse write(Buffer chunk, Handler<AsyncResult<Void>> handler) {
ByteBuf buf = chunk.getByteBuf();
write(buf, false, handler);
return this;
}
@Override
public HttpServerResponse write(String chunk, String enc) {
return write(Buffer.buffer(chunk, enc).getByteBuf());
}
@Override
public HttpServerResponse write(String chunk, String enc, Handler<AsyncResult<Void>> handler) {
write(Buffer.buffer(chunk, enc).getByteBuf(), false, handler);
return this;
}
@Override
public HttpServerResponse write(String chunk) {
return write(Buffer.buffer(chunk).getByteBuf());
}
@Override
public HttpServerResponse write(String chunk, Handler<AsyncResult<Void>> handler) {
write(Buffer.buffer(chunk).getByteBuf(), false, handler);
return this;
}
private Http2ServerResponseImpl write(ByteBuf chunk) {
write(chunk, false, null);
return this;
}
@Override
public void end(String chunk) {
end(Buffer.buffer(chunk));
}
@Override
public void end(String chunk, Handler<AsyncResult<Void>> handler) {
end(Buffer.buffer(chunk), handler);
}
@Override
public void end(String chunk, String enc) {
end(Buffer.buffer(chunk, enc));
}
@Override
public void end(String chunk, String enc, Handler<AsyncResult<Void>> handler) {
end(Buffer.buffer(chunk, enc));
}
@Override
public void end(Buffer chunk) {
end(chunk.getByteBuf(), null);
}
@Override
public void end(Buffer chunk, Handler<AsyncResult<Void>> handler) {
end(chunk.getByteBuf(), handler);
}
@Override
public void end() {
end((ByteBuf) null, null);
}
@Override
public void end(Handler<AsyncResult<Void>> handler) {
end((ByteBuf) null, handler);
}
NetSocket netSocket() {
checkValid();
synchronized (conn) {
if (netSocket != null) {
return netSocket;
}
status = HttpResponseStatus.OK;
if (!checkSendHeaders(false)) {
throw new IllegalStateException("Response for CONNECT already sent");
}
ctx.flush();
netSocket = conn.toNetSocket(stream);
}
return netSocket;
}
private void end(ByteBuf chunk, Handler<AsyncResult<Void>> handler) {
write(chunk, true, handler);
}
private void () {
if (head || status == HttpResponseStatus.NOT_MODIFIED) {
headers.remove(HttpHeaders.TRANSFER_ENCODING);
} else if (status == HttpResponseStatus.RESET_CONTENT) {
headers.remove(HttpHeaders.TRANSFER_ENCODING);
headers.set(HttpHeaders.CONTENT_LENGTH, "0");
} else if (status.codeClass() == HttpStatusClass.INFORMATIONAL || status == HttpResponseStatus.NO_CONTENT) {
headers.remove(HttpHeaders.TRANSFER_ENCODING);
headers.remove(HttpHeaders.CONTENT_LENGTH);
}
}
private boolean (boolean end) {
if (!headWritten) {
if (headersEndHandler != null) {
headersEndHandler.handle(null);
}
sanitizeHeaders();
headWritten = true;
headers.status(Integer.toString(status.code()));
stream.writeHead(headers, end, null);
if (end) {
ctx.flush();
}
return true;
} else {
return false;
}
}
void write(ByteBuf chunk, boolean end, Handler<AsyncResult<Void>> handler) {
Handler<Void> bodyEndHandler;
Handler<Void> endHandler;
synchronized (conn) {
if (ended) {
throw new IllegalStateException("Response has already been written");
}
ended |= end;
boolean hasBody = false;
if (chunk != null) {
hasBody = true;
} else {
chunk = Unpooled.EMPTY_BUFFER;
}
if (end) {
if (!headWritten && !head && status != HttpResponseStatus.NOT_MODIFIED && !headers.contains(HttpHeaderNames.CONTENT_LENGTH)) {
headers().set(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(chunk.readableBytes()));
}
}
boolean sent = checkSendHeaders(end && !hasBody && trailers == null);
if (hasBody || (!sent && end)) {
stream.writeData(chunk, end && trailers == null, handler);
}
if (end && trailers != null) {
stream.writeHeaders(trailers, true, null);
}
bodyEndHandler = this.bodyEndHandler;
endHandler = this.endHandler;
}
if (end) {
if (bodyEndHandler != null) {
bodyEndHandler.handle(null);
}
if (endHandler != null) {
endHandler.handle(null);
}
}
}
@Override
public HttpServerResponse writeCustomFrame(int type, int flags, Buffer payload) {
synchronized (conn) {
checkValid();
checkSendHeaders(false);
stream.writeFrame(type, flags, payload.getByteBuf());
ctx.flush();
return this;
}
}
private void checkValid() {
if (ended) {
throw new IllegalStateException("Response has already been written");
}
if (closed) {
throw new IllegalStateException("Response is closed");
}
}
void writabilityChanged() {
if (!ended && !writeQueueFull() && drainHandler != null) {
drainHandler.handle(null);
}
}
@Override
public boolean writeQueueFull() {
synchronized (conn) {
checkValid();
return stream.isNotWritable();
}
}
@Override
public HttpServerResponse setWriteQueueMaxSize(int maxSize) {
synchronized (conn) {
checkValid();
}
return this;
}
@Override
public HttpServerResponse drainHandler(Handler<Void> handler) {
synchronized (conn) {
if (handler != null) {
checkValid();
}
drainHandler = handler;
return this;
}
}
@Override
public HttpServerResponse sendFile(String filename, long offset, long length) {
return sendFile(filename, offset, length, null);
}
@Override
public HttpServerResponse sendFile(String filename, long offset, long length, Handler<AsyncResult<Void>> resultHandler) {
synchronized (conn) {
checkValid();
Context resultCtx = resultHandler != null ? stream.vertx.getOrCreateContext() : null;
File file = stream.vertx.resolveFile(filename);
if (!file.exists()) {
if (resultHandler != null) {
resultCtx.runOnContext((v) -> resultHandler.handle(Future.failedFuture(new FileNotFoundException())));
} else {
log.error("File not found: " + filename);
}
return this;
}
RandomAccessFile raf;
try {
raf = new RandomAccessFile(file, "r");
} catch (IOException e) {
if (resultHandler != null) {
resultCtx.runOnContext((v) -> resultHandler.handle(Future.failedFuture(e)));
} else {
log.error("Failed to send file", e);
}
return this;
}
long contentLength = Math.min(length, file.length() - offset);
if (headers.get(HttpHeaderNames.CONTENT_LENGTH) == null) {
putHeader(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(contentLength));
}
if (headers.get(HttpHeaderNames.CONTENT_TYPE) == null) {
String contentType = MimeMapping.getMimeTypeForFilename(filename);
if (contentType != null) {
putHeader(HttpHeaderNames.CONTENT_TYPE, contentType);
}
}
checkSendHeaders(false);
Promise<Long> result = Promise.promise();
result.future().setHandler(ar -> {
if (ar.succeeded()) {
end();
}
if (resultHandler != null) {
resultCtx.runOnContext(v -> {
resultHandler.handle(Future.succeededFuture());
});
}
});
FileStreamChannel fileChannel = new FileStreamChannel(result, stream, offset, contentLength);
drainHandler(fileChannel.drainHandler);
ctx.channel()
.eventLoop()
.register(fileChannel)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
fileChannel.pipeline().fireUserEventTriggered(raf);
} else {
result.tryFail(future.cause());
}
});
}
return this;
}
@Override
public void close() {
conn.close();
}
@Override
public boolean ended() {
synchronized (conn) {
return ended;
}
}
@Override
public boolean closed() {
return conn.isClosed();
}
@Override
public boolean headWritten() {
synchronized (conn) {
return headWritten;
}
}
@Override
public HttpServerResponse headersEndHandler(@Nullable Handler<Void> handler) {
synchronized (conn) {
headersEndHandler = handler;
return this;
}
}
@Override
public HttpServerResponse bodyEndHandler(@Nullable Handler<Void> handler) {
synchronized (conn) {
bodyEndHandler = handler;
return this;
}
}
@Override
public long bytesWritten() {
return stream.bytesWritten();
}
@Override
public int streamId() {
return stream.id();
}
@Override
public void reset(long code) {
checkValid();
stream.writeReset(code);
ctx.flush();
}
@Override
public HttpServerResponse push(HttpMethod method, String host, String path, Handler<AsyncResult<HttpServerResponse>> handler) {
return push(method, host, path, null, handler);
}
@Override
public HttpServerResponse push(HttpMethod method, String path, MultiMap headers, Handler<AsyncResult<HttpServerResponse>> handler) {
return push(method, null, path, headers, handler);
}
@Override
public HttpServerResponse push(HttpMethod method, String host, String path, MultiMap headers, Handler<AsyncResult<HttpServerResponse>> handler) {
synchronized (conn) {
if (push) {
throw new IllegalStateException("A push response cannot promise another push");
}
checkValid();
conn.sendPush(stream.id(), host, method, headers, path, stream.priority(), handler);
return this;
}
}
@Override
public HttpServerResponse push(HttpMethod method, String path, Handler<AsyncResult<HttpServerResponse>> handler) {
return push(method, host, path, handler);
}
@Override
public HttpServerResponse setStreamPriority(StreamPriority priority) {
stream.updatePriority(priority);
return this;
}
}