package org.glassfish.grizzly.http2;
import static org.glassfish.grizzly.http2.Termination.OUT_FIN_TERMINATION;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.WriteHandler;
import org.glassfish.grizzly.WriteResult;
import org.glassfish.grizzly.asyncqueue.AsyncQueueRecord;
import org.glassfish.grizzly.asyncqueue.MessageCloner;
import org.glassfish.grizzly.asyncqueue.TaskQueue;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.http.HttpContent;
import org.glassfish.grizzly.http.HttpHeader;
import org.glassfish.grizzly.http.HttpPacket;
import org.glassfish.grizzly.http.HttpResponsePacket;
import org.glassfish.grizzly.http.HttpTrailer;
import org.glassfish.grizzly.http2.frames.ErrorCode;
import org.glassfish.grizzly.http2.frames.HeadersFrame;
import org.glassfish.grizzly.http2.frames.Http2Frame;
import org.glassfish.grizzly.http2.frames.PushPromiseFrame;
import org.glassfish.grizzly.http2.utils.ChunkedCompletionHandler;
import org.glassfish.grizzly.memory.Buffers;
class DefaultOutputSink implements StreamOutputSink {
private static final Logger LOGGER = Grizzly.logger(DefaultOutputSink.class);
private static final int MAX_OUTPUT_QUEUE_SIZE = 65536;
private static final int ZERO_QUEUE_RECORD_SIZE = 1;
private static final OutputQueueRecord TERMINATING_QUEUE_RECORD = new OutputQueueRecord(null, null, true, true);
final TaskQueue<OutputQueueRecord> outputQueue = TaskQueue.createTaskQueue(new TaskQueue.MutableMaxQueueSize() {
@Override
public int getMaxQueueSize() {
return MAX_OUTPUT_QUEUE_SIZE;
}
});
private final AtomicInteger availStreamWindowSize;
private volatile boolean isLastFrameQueued;
private Termination terminationFlag;
private final Http2Session http2Session;
private final Http2Stream stream;
private final AtomicInteger unflushedWritesCounter = new AtomicInteger();
private final Object flushHandlersSync = new Object();
private BundleQueue<CompletionHandler<Http2Stream>> flushHandlersQueue;
DefaultOutputSink(final Http2Stream stream) {
this.stream = stream;
http2Session = stream.getHttp2Session();
availStreamWindowSize = new AtomicInteger(stream.getPeerWindowSize());
}
@Override
public boolean canWrite() {
return outputQueue.size() < MAX_OUTPUT_QUEUE_SIZE;
}
@Override
public void notifyWritePossible(final WriteHandler writeHandler) {
outputQueue.notifyWritePossible(writeHandler, MAX_OUTPUT_QUEUE_SIZE);
}
private void assertReady() throws IOException {
if (isTerminated()) {
if (LOGGER.isLoggable(FINE)) {
LOGGER.log(FINE, "Terminated!!! id={0} description={1}", new Object[] { stream.getId(), terminationFlag.getDescription() });
}
throw new IOException(terminationFlag.getDescription());
} else if (isLastFrameQueued) {
throw new IOException("Write beyond end of stream");
}
}
@Override
public void onPeerWindowUpdate(final int delta) throws Http2StreamException {
final int currentWindow = availStreamWindowSize.get();
if (delta > 0 && currentWindow > 0 && currentWindow + delta < 0) {
throw new Http2StreamException(stream.getId(), ErrorCode.FLOW_CONTROL_ERROR, "Session flow-control window overflow.");
}
availStreamWindowSize.addAndGet(delta);
while ((isWantToWrite() && !outputQueue.isEmpty())
|| (outputQueue.peek() != null && outputQueue.peek().trailer != null)) {
OutputQueueRecord outputQueueRecord = outputQueue.poll();
if (outputQueueRecord == null) {
return;
}
if (outputQueueRecord == TERMINATING_QUEUE_RECORD) {
releaseWriteQueueSpace(0, true, true);
writeEmptyFin();
return;
}
final FlushCompletionHandler completionHandler = outputQueueRecord.chunkedCompletionHandler;
boolean isLast = outputQueueRecord.isLast;
final boolean isZeroSizeData = outputQueueRecord.isZeroSizeData;
final Source resource = outputQueueRecord.resource;
final HttpTrailer currentTrailer = outputQueueRecord.trailer;
if (currentTrailer != null) {
outputQueueRecord = null;
sendTrailers(completionHandler, currentTrailer);
return;
}
final int bytesToSend = checkOutputWindow(resource.remaining());
final Buffer dataChunkToSend = resource.read(bytesToSend);
final boolean hasRemaining = resource.hasRemaining();
if (hasRemaining) {
outputQueueRecord.reset(resource, completionHandler, isLast);
outputQueueRecord.incChunksCounter();
isLast = false;
} else {
outputQueueRecord.release();
outputQueueRecord = null;
}
if (dataChunkToSend != null && (dataChunkToSend.hasRemaining() || isLast)) {
final int dataChunkToSendSize = dataChunkToSend.remaining();
flushToConnectionOutputSink(dataChunkToSend, completionHandler, isLast);
availStreamWindowSize.addAndGet(-dataChunkToSendSize);
releaseWriteQueueSpace(dataChunkToSendSize, isZeroSizeData, outputQueueRecord == null);
outputQueue.doNotify();
} else if (isZeroSizeData && outputQueueRecord == null) {
releaseWriteQueueSpace(0, true, true);
outputQueue.doNotify();
}
if (outputQueueRecord != null) {
outputQueue.setCurrentElement(outputQueueRecord);
break;
}
}
}
@Override
public synchronized void writeDownStream(final HttpPacket httpPacket, final FilterChainContext ctx,
final CompletionHandler<WriteResult> completionHandler, final MessageCloner<Buffer> messageCloner)
throws IOException {
assert ctx != null;
assertReady();
final OutputQueueRecord next = writeDownStream0(httpPacket, ctx, completionHandler, messageCloner);
if (next != null) {
addOutputQueueRecord(next);
}
}
private OutputQueueRecord writeDownStream0(final HttpPacket httpPacket,
final FilterChainContext ctx,
final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner)
throws IOException {
final HttpHeader httpHeader = stream.getOutputHttpHeader();
final HttpContent httpContent = HttpContent.isContent(httpPacket) ? (HttpContent) httpPacket : null;
List<Http2Frame> headerFrames = null;
boolean sendTrailers = false;
boolean lockedByMe = false;
try {
boolean isLast = httpContent != null && httpContent.isLast();
final boolean isTrailer = HttpTrailer.isTrailer(httpContent);
if (!httpHeader.isCommitted()) {
final boolean dontSendPayload = !httpHeader.isExpectContent()
|| (httpContent != null && httpContent.isLast() && !httpContent.getContent().hasRemaining());
LOGGER.finest(() -> "Header not committed yet; dontSendPayload=" + dontSendPayload);
http2Session.getDeflaterLock().lock();
lockedByMe = true;
final boolean logging = NetLogger.isActive();
final Map<String, String> capture = logging ? new HashMap<>() : null;
headerFrames = http2Session.encodeHttpHeaderAsHeaderFrames(
ctx, httpHeader, stream.getId(), dontSendPayload, null, capture);
if (logging) {
for (Http2Frame http2Frame : headerFrames) {
if (http2Frame.getType() == PushPromiseFrame.TYPE) {
NetLogger.log(NetLogger.Context.TX, http2Session, (HeadersFrame) http2Frame, capture);
break;
}
}
}
stream.onSndHeaders(dontSendPayload);
if (!httpHeader.isRequest()) {
HttpResponsePacket response = (HttpResponsePacket) httpHeader;
if (response.isAcknowledgement()) {
response.acknowledged();
response.getHeaders().clear();
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(headerFrames, completionHandler, messageCloner, false);
LOGGER.finest("Acknowledgement has been sent.");
return null;
}
}
httpHeader.setCommitted(true);
if (dontSendPayload || httpContent == null) {
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(headerFrames, completionHandler, messageCloner, dontSendPayload);
sendTrailers = false;
LOGGER.finest(() -> "Nothing to send; dontSendPayload=" + dontSendPayload);
return null;
}
}
if (httpContent == null) {
sendTrailers = false;
LOGGER.finest("Nothing to send, httpContent is null.");
return null;
}
http2Session.handlerFilter.onHttpContentEncoded(httpContent, ctx);
Buffer data = httpContent.getContent();
final int dataSize = data.remaining();
unflushedWritesCounter.incrementAndGet();
final FlushCompletionHandler flushCompletionHandler = new FlushCompletionHandler(completionHandler);
final boolean isZeroSizeData = dataSize == 0;
final int spaceToReserve = isZeroSizeData ? ZERO_QUEUE_RECORD_SIZE : dataSize;
boolean isDataCloned = false;
final int spaceReserved = reserveWriteQueueSpace(spaceToReserve);
LOGGER.finest(() -> "Bytes reserved: " + spaceReserved + ", was requested: " + spaceToReserve);
if (spaceReserved > spaceToReserve) {
assert headerFrames == null;
if (messageCloner != null) {
data = messageCloner.clone(http2Session.getConnection(), data);
isDataCloned = true;
}
final OutputQueueRecord record = createOutputQueueRecord(data, httpContent, flushCompletionHandler,
isLast, isZeroSizeData);
outputQueue.offer(record);
if (outputQueue.size() != spaceToReserve) {
sendTrailers = isLast && isTrailer;
return null;
}
if (!outputQueue.remove(record)) {
sendTrailers = false;
LOGGER.finest("The record has been already processed.");
return null;
}
}
final int remaining = data.remaining();
final int fitWindowLen = checkOutputWindow(remaining);
LOGGER.finest(() -> "Remaining: " + remaining + ", fitWindowLen: " + fitWindowLen);
final OutputQueueRecord outputQueueRecord;
if (fitWindowLen >= remaining) {
outputQueueRecord = null;
} else {
if (!isDataCloned && messageCloner != null) {
data = messageCloner.clone(http2Session.getConnection(), data);
isDataCloned = true;
}
final Buffer dataChunkToStore = splitOutputBufferIfNeeded(data, fitWindowLen);
outputQueueRecord = createOutputQueueRecord(dataChunkToStore, null, flushCompletionHandler, isLast,
isZeroSizeData);
isLast = false;
}
final Buffer dataToSend = prepareDataToSend(outputQueueRecord == null, isLast, data, isZeroSizeData);
if (headerFrames != null || dataToSend != null) {
if (outputQueueRecord != null) {
outputQueueRecord.incChunksCounter();
}
flushToConnectionOutputSink(headerFrames, dataToSend, flushCompletionHandler,
isDataCloned ? null : messageCloner, isLast && !isTrailer);
}
LOGGER.finest("isLast=" + isLast + "; isTrailer=" + isTrailer);
sendTrailers = isLast && isTrailer;
return isLast ? null : outputQueueRecord;
} finally {
LOGGER.finest("sendTrailers=" + sendTrailers);
if (sendTrailers) {
sendTrailers(completionHandler, (HttpTrailer) httpContent);
}
if (lockedByMe) {
http2Session.getDeflaterLock().unlock();
}
}
}
private OutputQueueRecord createOutputQueueRecord(final Buffer data, final HttpContent httpContent,
final FlushCompletionHandler flushCompletionHandler, boolean isLast, final boolean isZeroSizeData) {
final Source bufferSource = Source.factory(stream).createBufferSource(data);
if (httpContent instanceof HttpTrailer) {
return new OutputQueueRecord(bufferSource, flushCompletionHandler, (HttpTrailer) httpContent, false);
}
return new OutputQueueRecord(bufferSource, flushCompletionHandler, isLast, isZeroSizeData);
}
private Buffer prepareDataToSend(final boolean isRecordNull, final boolean isLast, final Buffer data,
final boolean isZeroSizeData) {
if (data == null) {
return null;
}
if (data.hasRemaining() || isLast) {
final int dataChunkToSendSize = data.remaining();
availStreamWindowSize.addAndGet(-dataChunkToSendSize);
releaseWriteQueueSpace(dataChunkToSendSize, isZeroSizeData, isRecordNull);
return data;
}
return null;
}
@Override
public void flush(final CompletionHandler<Http2Stream> completionHandler) {
if (unflushedWritesCounter.get() > 0) {
synchronized (flushHandlersSync) {
final int counterNow = unflushedWritesCounter.get();
if (counterNow > 0) {
if (flushHandlersQueue == null) {
flushHandlersQueue = new BundleQueue<>();
}
flushHandlersQueue.add(counterNow, completionHandler);
return;
}
}
}
completionHandler.completed(stream);
}
private int checkOutputWindow(final long size) {
return Math.max(0, Math.min(availStreamWindowSize.get(), (int) size));
}
private Buffer splitOutputBufferIfNeeded(final Buffer buffer, final int length) {
if (length == buffer.remaining()) {
return null;
}
return buffer.split(buffer.position() + length);
}
private void flushToConnectionOutputSink(final List<Http2Frame> headerFrames,
final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner,
final boolean sendFIN) {
final FlushCompletionHandler flushCompletionHandler = new FlushCompletionHandler(completionHandler);
flushToConnectionOutputSink(headerFrames, null, flushCompletionHandler, messageCloner, sendFIN);
}
private void flushToConnectionOutputSink(
final Buffer data,
final FlushCompletionHandler flushCompletionHandler,
final boolean sendFIN) {
flushToConnectionOutputSink(null, data, flushCompletionHandler, null, sendFIN);
}
private void flushToConnectionOutputSink(
final List<Http2Frame> headerFrames,
final Buffer data,
final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner,
final boolean sendFIN) {
http2Session.getOutputSink().writeDataDownStream(
stream, headerFrames, data, completionHandler, messageCloner, sendFIN);
if (sendFIN) {
terminate(OUT_FIN_TERMINATION);
}
}
@Override
public synchronized void close() {
if (!isClosed()) {
isLastFrameQueued = true;
if (outputQueue.isEmpty()) {
writeEmptyFin();
return;
}
outputQueue.reserveSpace(ZERO_QUEUE_RECORD_SIZE);
outputQueue.offer(TERMINATING_QUEUE_RECORD);
if (outputQueue.size() == ZERO_QUEUE_RECORD_SIZE && outputQueue.remove(TERMINATING_QUEUE_RECORD)) {
writeEmptyFin();
}
}
}
@Override
public synchronized void terminate(final Termination terminationFlag) {
if (!isTerminated()) {
this.terminationFlag = terminationFlag;
outputQueue.onClose();
stream.onOutputClosed();
}
}
@Override
public boolean isClosed() {
return isLastFrameQueued || isTerminated();
}
@Override
public int getUnflushedWritesCount() {
return unflushedWritesCounter.get();
}
private boolean isTerminated() {
return terminationFlag != null;
}
private void writeEmptyFin() {
if (!isTerminated()) {
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(Buffers.EMPTY_BUFFER, new FlushCompletionHandler(null), true);
}
}
private boolean isWantToWrite() {
final int availableWindowSizeBytesNow = availStreamWindowSize.get();
final int windowSizeLimit = stream.getPeerWindowSize();
return availableWindowSizeBytesNow >= windowSizeLimit / 4;
}
private void addOutputQueueRecord(OutputQueueRecord outputQueueRecord) throws Http2StreamException {
do {
outputQueue.setCurrentElement(outputQueueRecord);
if (!isWantToWrite() || !outputQueue.compareAndSetCurrentElement(outputQueueRecord, null)) {
break;
}
final FlushCompletionHandler chunkedCompletionHandler = outputQueueRecord.chunkedCompletionHandler;
final HttpTrailer currentTrailer = outputQueueRecord.trailer;
if (currentTrailer != null) {
sendTrailers(chunkedCompletionHandler, currentTrailer);
return;
}
boolean isLast = outputQueueRecord.isLast;
final boolean isZeroSizeData = outputQueueRecord.isZeroSizeData;
final Source currentResource = outputQueueRecord.resource;
final int fitWindowLen = checkOutputWindow(currentResource.remaining());
final Buffer dataChunkToSend = currentResource.read(fitWindowLen);
if (currentResource.hasRemaining()) {
outputQueueRecord.reset(currentResource, chunkedCompletionHandler, isLast);
outputQueueRecord.incChunksCounter();
isLast = false;
} else {
outputQueueRecord.release();
outputQueueRecord = null;
}
if (dataChunkToSend != null && (dataChunkToSend.hasRemaining() || isLast)) {
final int dataChunkToSendSize = dataChunkToSend.remaining();
flushToConnectionOutputSink(dataChunkToSend, chunkedCompletionHandler, isLast);
availStreamWindowSize.addAndGet(-dataChunkToSendSize);
releaseWriteQueueSpace(dataChunkToSendSize, isZeroSizeData, outputQueueRecord == null);
} else if (isZeroSizeData && outputQueueRecord == null) {
releaseWriteQueueSpace(0, true, true);
} else if (dataChunkToSend != null && !dataChunkToSend.hasRemaining()) {
if (outputQueueRecord != null) {
reserveWriteQueueSpace(outputQueueRecord.resource.remaining());
outputQueue.offer(outputQueueRecord);
}
break;
}
} while (outputQueueRecord != null);
}
private int reserveWriteQueueSpace(final int spaceToReserve) {
return outputQueue.reserveSpace(spaceToReserve);
}
private void releaseWriteQueueSpace(final int justSentBytes, final boolean isAtomic, final boolean isEndOfChunk) {
if (isEndOfChunk) {
outputQueue.releaseSpace(isAtomic ? ZERO_QUEUE_RECORD_SIZE : justSentBytes);
} else if (!isAtomic) {
outputQueue.releaseSpace(justSentBytes);
}
}
private void sendTrailers(final CompletionHandler<WriteResult> completionHandler, final HttpTrailer httpContent) {
http2Session.getDeflaterLock().lock();
try {
final boolean logging = NetLogger.isActive();
final Map<String,String> capture = logging ? new HashMap<>() : null;
final List<Http2Frame> trailerFrames =
http2Session.encodeTrailersAsHeaderFrames(stream.getId(),
new ArrayList<>(4),
httpContent.getHeaders(), capture);
if (logging) {
for (Http2Frame http2Frame : trailerFrames) {
if (http2Frame.getType() == PushPromiseFrame.TYPE) {
NetLogger.log(NetLogger.Context.TX, http2Session, (HeadersFrame) http2Frame, capture);
break;
}
}
}
flushToConnectionOutputSink(trailerFrames, completionHandler, null, true);
unflushedWritesCounter.incrementAndGet();
} catch (IOException ex) {
LOGGER.log(WARNING, "Error sending trailers.", ex);
} finally {
close();
LOGGER.finest("Sending trailers finished, unlocking the deflater lock ...");
http2Session.getDeflaterLock().unlock();
}
}
private static class OutputQueueRecord extends AsyncQueueRecord<WriteResult> {
private final HttpTrailer trailer;
private final boolean isZeroSizeData;
private Source resource;
private FlushCompletionHandler chunkedCompletionHandler;
private boolean isLast;
public OutputQueueRecord(final Source resource, final FlushCompletionHandler completionHandler,
final boolean isLast, final boolean isZeroSizeData) {
super(null, null, null);
this.resource = resource;
this.chunkedCompletionHandler = completionHandler;
this.isLast = isLast;
this.trailer = null;
this.isZeroSizeData = isZeroSizeData;
}
public OutputQueueRecord(final Source resource,
final FlushCompletionHandler completionHandler,
final HttpTrailer trailer, final boolean isZeroDataSize) {
super(null, null, null);
this.resource = resource;
this.chunkedCompletionHandler = completionHandler;
this.isLast = true;
this.trailer = trailer;
this.isZeroSizeData = isZeroDataSize;
}
private void incChunksCounter() {
if (chunkedCompletionHandler != null) {
chunkedCompletionHandler.incChunks();
}
}
private void reset(final Source resource, final FlushCompletionHandler completionHandler, final boolean last) {
this.resource = resource;
this.chunkedCompletionHandler = completionHandler;
this.isLast = last;
}
public void release() {
if (resource != null) {
resource.release();
resource = null;
}
}
@Override
public void notifyFailure(final Throwable e) {
final CompletionHandler chLocal = chunkedCompletionHandler;
chunkedCompletionHandler = null;
try {
if (chLocal != null) {
chLocal.failed(e);
}
} finally {
release();
}
}
@Override
public void recycle() {
}
@Override
public WriteResult getCurrentResult() {
return null;
}
}
private final class FlushCompletionHandler extends ChunkedCompletionHandler {
public FlushCompletionHandler(final CompletionHandler<WriteResult> parentCompletionHandler) {
super(parentCompletionHandler);
}
@Override
protected void done0() {
synchronized (flushHandlersSync) {
unflushedWritesCounter.decrementAndGet();
if (flushHandlersQueue == null || !flushHandlersQueue.nextBundle()) {
return;
}
}
boolean hasNext;
CompletionHandler<Http2Stream> handler;
do {
synchronized (flushHandlersSync) {
handler = flushHandlersQueue.next();
hasNext = flushHandlersQueue.hasNext();
}
try {
handler.completed(stream);
} catch (Exception ignored) {
}
} while (hasNext);
}
}
}