/*
* Copyright (c) 2012, 2017 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.grizzly.http2;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.WriteHandler;
import org.glassfish.grizzly.WriteResult;
import org.glassfish.grizzly.asyncqueue.AsyncQueueRecord;
import org.glassfish.grizzly.asyncqueue.MessageCloner;
import org.glassfish.grizzly.asyncqueue.TaskQueue;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.http.HttpContent;
import org.glassfish.grizzly.http.HttpHeader;
import org.glassfish.grizzly.http.HttpPacket;
import org.glassfish.grizzly.http.HttpResponsePacket;
import org.glassfish.grizzly.http.HttpTrailer;
import org.glassfish.grizzly.http2.frames.ErrorCode;
import org.glassfish.grizzly.http2.frames.HeadersFrame;
import org.glassfish.grizzly.http2.frames.Http2Frame;
import org.glassfish.grizzly.http2.frames.PushPromiseFrame;
import org.glassfish.grizzly.http2.utils.ChunkedCompletionHandler;
import org.glassfish.grizzly.memory.Buffers;
import static org.glassfish.grizzly.http2.Termination.OUT_FIN_TERMINATION;
Default implementation of an output sink, which is associated with specific Http2Stream
. The implementation is aligned with HTTP/2 requirements with regards to message flow control. Author: Alexey Stashok
/**
* Default implementation of an output sink, which is associated
* with specific {@link Http2Stream}.
*
* The implementation is aligned with HTTP/2 requirements with regards to message
* flow control.
*
* @author Alexey Stashok
*/
class DefaultOutputSink implements StreamOutputSink {
private static final Logger LOGGER = Grizzly.logger(StreamOutputSink.class);
private static final int MAX_OUTPUT_QUEUE_SIZE = 65536;
private static final int ZERO_QUEUE_RECORD_SIZE = 1;
private static final OutputQueueRecord TERMINATING_QUEUE_RECORD =
new OutputQueueRecord(null, null, true, true);
// async output queue
final TaskQueue<OutputQueueRecord> outputQueue =
TaskQueue.createTaskQueue(new TaskQueue.MutableMaxQueueSize() {
@Override
public int getMaxQueueSize() {
return MAX_OUTPUT_QUEUE_SIZE;
}
});
// the space (in bytes) in flow control window, that still could be used.
// in other words the number of bytes, which could be sent to the peer
private final AtomicInteger availStreamWindowSize;
// true, if last output frame has been queued
private volatile boolean isLastFrameQueued;
// not null if last output frame has been sent or forcibly terminated
private Termination terminationFlag;
// associated HTTP/2 session
private final Http2Session http2Session;
// associated HTTP/2 stream
private final Http2Stream stream;
// counter for unflushed writes
private final AtomicInteger unflushedWritesCounter = new AtomicInteger();
// sync object to count/notify flush handlers
private final Object flushHandlersSync = new Object();
// flush handlers queue
private BundleQueue<CompletionHandler<Http2Stream>> flushHandlersQueue;
DefaultOutputSink(final Http2Stream stream) {
this.stream = stream;
http2Session = stream.getHttp2Session();
availStreamWindowSize = new AtomicInteger(stream.getPeerWindowSize());
}
@Override
public boolean canWrite() {
return outputQueue.size() < MAX_OUTPUT_QUEUE_SIZE;
}
@Override
public void notifyWritePossible(final WriteHandler writeHandler) {
outputQueue.notifyWritePossible(writeHandler, MAX_OUTPUT_QUEUE_SIZE);
}
private void assertReady() throws IOException {
// if the last frame (fin flag == 1) has been queued already - throw an IOException
if (isTerminated()) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.log(Level.FINE, "Terminated!!! id={0} description={1}",
new Object[]{stream.getId(), terminationFlag.getDescription()});
}
throw new IOException(terminationFlag.getDescription());
} else if (isLastFrameQueued) {
throw new IOException("Write beyond end of stream");
}
}
The method is called by HTTP2 Filter once WINDOW_UPDATE message comes for this Http2Stream
. Params: - delta – the delta.
Throws: - Http2StreamException – if an error occurs processing the window update.
/**
* The method is called by HTTP2 Filter once WINDOW_UPDATE message comes
* for this {@link Http2Stream}.
*
* @param delta the delta.
* @throws Http2StreamException if an error occurs processing the window update.
*/
@Override
public void onPeerWindowUpdate(final int delta) throws Http2StreamException {
final int currentWindow = availStreamWindowSize.get();
if (delta > 0 && currentWindow > 0 && currentWindow + delta < 0) {
throw new Http2StreamException(stream.getId(),
ErrorCode.FLOW_CONTROL_ERROR,
"Session flow-control window overflow.");
}
// update the available window size
availStreamWindowSize.addAndGet(delta);
// try to write until window limit allows
while (isWantToWrite() &&
!outputQueue.isEmpty()) {
// pick up the first output record in the queue
OutputQueueRecord outputQueueRecord = outputQueue.poll();
// if there is nothing to write - return
if (outputQueueRecord == null) {
return;
}
// if it's terminating record - processFin
if (outputQueueRecord == TERMINATING_QUEUE_RECORD) {
// if it's TERMINATING_QUEUE_RECORD - don't forget to release ATOMIC_QUEUE_RECORD_SIZE
releaseWriteQueueSpace(0, true, true);
writeEmptyFin();
return;
}
final FlushCompletionHandler completionHandler =
outputQueueRecord.chunkedCompletionHandler;
boolean isLast = outputQueueRecord.isLast;
final boolean isZeroSizeData = outputQueueRecord.isZeroSizeData;
final Source resource = outputQueueRecord.resource;
// check if output record's buffer is fitting into window size
// if not - split it into 2 parts: part to send, part to keep in the queue
final int bytesToSend = checkOutputWindow(resource.remaining());
final Buffer dataChunkToSend = resource.read(bytesToSend);
final boolean hasRemaining = resource.hasRemaining();
// if there is a chunk to store
if (hasRemaining) {
// Create output record for the chunk to be stored
outputQueueRecord.reset(resource, completionHandler, isLast);
outputQueueRecord.incChunksCounter();
// reset isLast for the current chunk
isLast = false;
} else {
outputQueueRecord.release();
outputQueueRecord = null;
}
// if there is a chunk to sent
if (dataChunkToSend != null &&
(dataChunkToSend.hasRemaining() || isLast)) {
final int dataChunkToSendSize = dataChunkToSend.remaining();
// send a http2 data frame
flushToConnectionOutputSink(null, dataChunkToSend, completionHandler,
null, isLast);
// update the available window size bytes counter
availStreamWindowSize.addAndGet(-dataChunkToSendSize);
releaseWriteQueueSpace(dataChunkToSendSize,
isZeroSizeData, outputQueueRecord == null);
outputQueue.doNotify();
} else if (isZeroSizeData && outputQueueRecord == null) {
// if it's atomic and no remainder left - don't forget to release ATOMIC_QUEUE_RECORD_SIZE
releaseWriteQueueSpace(0, true, true);
outputQueue.doNotify();
}
if (outputQueueRecord != null) {
// if there is a chunk to be stored - store it and return
outputQueue.setCurrentElement(outputQueueRecord);
break;
}
}
}
Send an HttpPacket
to the Http2Stream
. The writeDownStream(...) methods have to be synchronized with shutdown(). Params: - httpPacket –
HttpPacket
to send - completionHandler – the
CompletionHandler
, which will be notified about write progress. - messageCloner – the
MessageCloner
, which will be able to clone the message in case it can't be completely written in the current thread.
Throws: - IOException – if an error occurs with the write operation.
/**
* Send an {@link HttpPacket} to the {@link Http2Stream}.
*
* The writeDownStream(...) methods have to be synchronized with shutdown().
*
* @param httpPacket {@link HttpPacket} to send
* @param completionHandler the {@link CompletionHandler},
* which will be notified about write progress.
* @param messageCloner the {@link MessageCloner}, which will be able to
* clone the message in case it can't be completely written in the
* current thread.
* @throws IOException if an error occurs with the write operation.
*/
@SuppressWarnings("ConstantConditions")
@Override
public synchronized void writeDownStream(final HttpPacket httpPacket,
final FilterChainContext ctx,
final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner)
throws IOException {
assert ctx != null;
assertReady();
final HttpHeader httpHeader = stream.getOutputHttpHeader();
final HttpContent httpContent = HttpContent.isContent(httpPacket) ? (HttpContent) httpPacket : null;
List<Http2Frame> headerFrames = null;
OutputQueueRecord outputQueueRecord = null;
boolean isDeflaterLocked = false;
try { // try-finally block to release deflater lock if needed
// If HTTP header hasn't been committed - commit it
if (!httpHeader.isCommitted()) {
// do we expect any HTTP payload?
final boolean isNoPayload = !httpHeader.isExpectContent() ||
(httpContent != null && httpContent.isLast() &&
!httpContent.getContent().hasRemaining());
// !!!!! LOCK the deflater
isDeflaterLocked = true;
http2Session.getDeflaterLock().lock();
final boolean logging = NetLogger.isActive();
final Map<String,String> capture = ((logging) ? new HashMap<>() : null);
headerFrames = http2Session.encodeHttpHeaderAsHeaderFrames(
ctx, httpHeader, stream.getId(), isNoPayload, null, capture);
if (logging) {
for (Http2Frame http2Frame : headerFrames) {
if (http2Frame.getType() == PushPromiseFrame.TYPE) {
NetLogger.log(NetLogger.Context.TX, http2Session, (HeadersFrame) http2Frame, capture);
break;
}
}
}
stream.onSndHeaders(isNoPayload);
// 100-Continue block
if (!httpHeader.isRequest()) {
HttpResponsePacket response = (HttpResponsePacket) httpHeader;
if (response.isAcknowledgement()) {
response.acknowledged();
response.getHeaders().clear();
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(headerFrames, null,
new FlushCompletionHandler(completionHandler),
messageCloner, false);
return;
}
}
httpHeader.setCommitted(true);
if (isNoPayload || httpContent == null) {
// if we don't expect any HTTP payload, mark this frame as
// last and return
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(headerFrames, null,
new FlushCompletionHandler(completionHandler),
messageCloner, isNoPayload);
return;
}
}
// if there is nothing to write - return
if (httpContent == null) {
return;
}
http2Session.handlerFilter.onHttpContentEncoded(httpContent, ctx);
Buffer dataToSend = null;
boolean isLast = httpContent.isLast();
final boolean isTrailer = HttpTrailer.isTrailer(httpContent);
Buffer data = httpContent.getContent();
final int dataSize = data.remaining();
if (isLast && dataSize == 0) {
if (isTrailer) {
// !!!!! LOCK the deflater
isDeflaterLocked = true;
sendTrailers(completionHandler, messageCloner, (HttpTrailer) httpContent);
}
close();
return;
}
unflushedWritesCounter.incrementAndGet();
final FlushCompletionHandler flushCompletionHandler =
new FlushCompletionHandler(completionHandler);
boolean isDataCloned = false;
final boolean isZeroSizeData = (dataSize == 0);
final int spaceToReserve = isZeroSizeData ? ZERO_QUEUE_RECORD_SIZE : dataSize;
// Check if output queue is not empty - add new element
if (reserveWriteQueueSpace(spaceToReserve) > spaceToReserve) {
// if the queue is not empty - the headers should have been sent
assert headerFrames == null;
if (messageCloner != null) {
data = messageCloner.clone(
http2Session.getConnection(), data);
isDataCloned = true;
}
outputQueueRecord = new OutputQueueRecord(
Source.factory(stream)
.createBufferSource(data),
flushCompletionHandler, isLast, isZeroSizeData);
outputQueue.offer(outputQueueRecord);
// check if our element wasn't forgotten (async)
if (outputQueue.size() != spaceToReserve ||
!outputQueue.remove(outputQueueRecord)) {
// if not - return
return;
}
outputQueueRecord = null;
}
// our element is first in the output queue
final int remaining = data.remaining();
// check if output record's buffer is fitting into window size
// if not - split it into 2 parts: part to send, part to keep in the queue
final int fitWindowLen = checkOutputWindow(remaining);
// if there is a chunk to store
if (fitWindowLen < remaining) {
if (!isDataCloned && messageCloner != null) {
data = messageCloner.clone(
http2Session.getConnection(), data);
isDataCloned = true;
}
final Buffer dataChunkToStore = splitOutputBufferIfNeeded(
data, fitWindowLen);
// Create output record for the chunk to be stored
outputQueueRecord = new OutputQueueRecord(
Source.factory(stream)
.createBufferSource(dataChunkToStore),
flushCompletionHandler,
isLast, isZeroSizeData);
// reset completion handler and isLast for the current chunk
isLast = false;
}
// if there is a chunk to send
if (data != null &&
(data.hasRemaining() || isLast)) {
final int dataChunkToSendSize = data.remaining();
// update the available window size bytes counter
availStreamWindowSize.addAndGet(-dataChunkToSendSize);
releaseWriteQueueSpace(dataChunkToSendSize,
isZeroSizeData, outputQueueRecord == null);
dataToSend = data;
}
// if there's anything to send - send it
if (headerFrames != null || dataToSend != null) {
// if another part of data is stored in the queue -
// we have to increase CompletionHandler counter to avoid
// premature notification
if (outputQueueRecord != null) {
outputQueueRecord.incChunksCounter();
}
flushToConnectionOutputSink(headerFrames, dataToSend, flushCompletionHandler,
isDataCloned ? null : messageCloner,
isLast && !isTrailer);
}
if (isLast) {
if (isTrailer) {
// !!!!! LOCK the deflater
isDeflaterLocked = true;
sendTrailers(completionHandler, messageCloner, (HttpTrailer) httpContent);
}
close();
return;
}
} finally {
if (isDeflaterLocked) {
http2Session.getDeflaterLock().unlock();
}
}
if (outputQueueRecord == null) {
return;
}
addOutputQueueRecord(outputQueueRecord);
}
Flush Http2Stream
output and notify CompletionHandler
once all output data has been flushed. Params: - completionHandler –
CompletionHandler
to be notified
/**
* Flush {@link Http2Stream} output and notify {@link CompletionHandler} once
* all output data has been flushed.
*
* @param completionHandler {@link CompletionHandler} to be notified
*/
@Override
public void flush(
final CompletionHandler<Http2Stream> completionHandler) {
// check if there are pending unflushed data
if (unflushedWritesCounter.get() > 0) {
// if yes - synchronize do disallow decrease counter from other thread (increasing is ok)
synchronized (flushHandlersSync) {
// double check the pending flushes counter
final int counterNow = unflushedWritesCounter.get();
if (counterNow > 0) {
// if there are pending flushes
if (flushHandlersQueue == null) {
// create a flush handlers queue
flushHandlersQueue =
new BundleQueue<>();
}
// add the handler to the queue
flushHandlersQueue.add(counterNow, completionHandler);
return;
}
}
}
// if there are no pending flushes - notify the handler
completionHandler.completed(stream);
}
The method is responsible for checking the current output window size.
The returned integer value is the size of the data, which could be
sent now.
Params: - size – check the provided size against the window size limit.
Returns: the amount of data that may be written.
/**
* The method is responsible for checking the current output window size.
* The returned integer value is the size of the data, which could be
* sent now.
*
* @param size check the provided size against the window size limit.
*
* @return the amount of data that may be written.
*/
private int checkOutputWindow(final long size) {
// take a snapshot of the current output window state and check if we
// can fit "size" into window.
// Make sure we return positive value or zero, because availStreamWindowSize could be negative.
return Math.max(0, Math.min(availStreamWindowSize.get(), (int) size));
}
private Buffer splitOutputBufferIfNeeded(final Buffer buffer,
final int length) {
if (length == buffer.remaining()) {
return null;
}
return buffer.split(buffer.position() + length);
}
private void flushToConnectionOutputSink(
final List<Http2Frame> headerFrames,
final Buffer data,
final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner,
final boolean isLast) {
http2Session.getOutputSink().writeDataDownStream(stream, headerFrames,
data, completionHandler, messageCloner, isLast);
if (isLast) {
terminate(OUT_FIN_TERMINATION);
}
}
Closes the output sink by adding last DataFrame with the FIN flag to a queue.
If the output sink is already closed - method does nothing.
/**
* Closes the output sink by adding last DataFrame with the FIN flag to a queue.
* If the output sink is already closed - method does nothing.
*/
@Override
public synchronized void close() {
if (!isClosed()) {
isLastFrameQueued = true;
if (outputQueue.isEmpty()) {
writeEmptyFin();
return;
}
outputQueue.reserveSpace(ZERO_QUEUE_RECORD_SIZE);
outputQueue.offer(TERMINATING_QUEUE_RECORD);
if (outputQueue.size() == ZERO_QUEUE_RECORD_SIZE &&
outputQueue.remove(TERMINATING_QUEUE_RECORD)) {
writeEmptyFin();
}
}
}
Unlike close()
this method forces the output sink termination by setting termination flag and canceling all the pending writes. /**
* Unlike {@link #close()} this method forces the output sink termination
* by setting termination flag and canceling all the pending writes.
*/
@Override
public synchronized void terminate(final Termination terminationFlag) {
if (!isTerminated()) {
this.terminationFlag = terminationFlag;
outputQueue.onClose();
// NOTIFY STREAM
stream.onOutputClosed();
}
}
@Override
public boolean isClosed() {
return isLastFrameQueued || isTerminated();
}
Returns: the number of writes (not bytes), that haven't reached network layer
/**
* @return the number of writes (not bytes), that haven't reached network layer
*/
@Override
public int getUnflushedWritesCount() {
return unflushedWritesCounter.get();
}
private boolean isTerminated() {
return terminationFlag != null;
}
private void writeEmptyFin() {
if (!isTerminated()) {
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(null, Buffers.EMPTY_BUFFER,
new FlushCompletionHandler(null), null, true);
}
}
private boolean isWantToWrite() {
// update the available window size
final int availableWindowSizeBytesNow = availStreamWindowSize.get();
// get the current peer's window size limit
final int windowSizeLimit = stream.getPeerWindowSize();
return availableWindowSizeBytesNow >= (windowSizeLimit / 4);
}
private void addOutputQueueRecord(OutputQueueRecord outputQueueRecord)
throws Http2StreamException {
do { // Make sure current outputQueueRecord is not forgotten
// set the outputQueueRecord as the current
outputQueue.setCurrentElement(outputQueueRecord);
// check if situation hasn't changed and we can't send the data chunk now
if (isWantToWrite() &&
outputQueue.compareAndSetCurrentElement(outputQueueRecord, null)) {
// if we can send the output record now - do that
final FlushCompletionHandler chunkedCompletionHandler =
outputQueueRecord.chunkedCompletionHandler;
boolean isLast = outputQueueRecord.isLast;
final boolean isZeroSizeData = outputQueueRecord.isZeroSizeData;
final Source currentResource = outputQueueRecord.resource;
final int fitWindowLen = checkOutputWindow(currentResource.remaining());
final Buffer dataChunkToSend = currentResource.read(fitWindowLen);
// if there is a chunk to store
if (currentResource.hasRemaining()) {
// Create output record for the chunk to be stored
outputQueueRecord.reset(currentResource,
chunkedCompletionHandler,
isLast);
outputQueueRecord.incChunksCounter();
// reset isLast for the current chunk
isLast = false;
} else {
outputQueueRecord.release();
outputQueueRecord = null;
}
// if there is a chunk to send
if (dataChunkToSend != null &&
(dataChunkToSend.hasRemaining() || isLast)) {
final int dataChunkToSendSize = dataChunkToSend.remaining();
flushToConnectionOutputSink(null, dataChunkToSend,
chunkedCompletionHandler, null, isLast);
// update the available window size bytes counter
availStreamWindowSize.addAndGet(-dataChunkToSendSize);
releaseWriteQueueSpace(dataChunkToSendSize, isZeroSizeData,
outputQueueRecord == null);
} else if (isZeroSizeData && outputQueueRecord == null) {
// if it's atomic and no remainder left - don't forget to release ATOMIC_QUEUE_RECORD_SIZE
releaseWriteQueueSpace(0, true, true);
} else if (dataChunkToSend != null && !dataChunkToSend.hasRemaining()) {
// current window won't allow the data to be sent. Will be written once the
// window changes.
if (outputQueueRecord != null) {
reserveWriteQueueSpace(outputQueueRecord.resource.remaining());
outputQueue.offer(outputQueueRecord);
}
break;
}
} else {
break; // will be (or already) written asynchronously
}
} while (outputQueueRecord != null);
}
private int reserveWriteQueueSpace(final int spaceToReserve) {
return outputQueue.reserveSpace(spaceToReserve);
}
private void releaseWriteQueueSpace(final int justSentBytes, final boolean isAtomic,
final boolean isEndOfChunk) {
if (isEndOfChunk) {
outputQueue.releaseSpace(isAtomic ? ZERO_QUEUE_RECORD_SIZE : justSentBytes);
} else if (!isAtomic) {
outputQueue.releaseSpace(justSentBytes);
}
}
private void sendTrailers(final CompletionHandler<WriteResult> completionHandler,
final MessageCloner<Buffer> messageCloner,
final HttpTrailer httpContent)
throws IOException {
http2Session.getDeflaterLock().lock();
final boolean logging = NetLogger.isActive();
final Map<String,String> capture = ((logging) ? new HashMap<>() : null);
List<Http2Frame> trailerFrames =
http2Session.encodeTrailersAsHeaderFrames(stream.getId(),
new ArrayList<>(4),
httpContent.getHeaders(), capture);
if (logging) {
for (Http2Frame http2Frame : trailerFrames) {
if (http2Frame.getType() == PushPromiseFrame.TYPE) {
NetLogger.log(NetLogger.Context.TX, http2Session, (HeadersFrame) http2Frame, capture);
break;
}
}
}
unflushedWritesCounter.incrementAndGet();
flushToConnectionOutputSink(trailerFrames, null,
new FlushCompletionHandler(completionHandler),
messageCloner, true);
}
private static class OutputQueueRecord extends AsyncQueueRecord<WriteResult> {
private Source resource;
private FlushCompletionHandler chunkedCompletionHandler;
private boolean isLast;
private final boolean isZeroSizeData;
public OutputQueueRecord(final Source resource,
final FlushCompletionHandler completionHandler,
final boolean isLast, final boolean isZeroSizeData) {
super(null, null, null);
this.resource = resource;
this.chunkedCompletionHandler = completionHandler;
this.isLast = isLast;
this.isZeroSizeData = isZeroSizeData;
}
private void incChunksCounter() {
if (chunkedCompletionHandler != null) {
chunkedCompletionHandler.incChunks();
}
}
private void reset(final Source resource,
final FlushCompletionHandler completionHandler,
final boolean last) {
this.resource = resource;
this.chunkedCompletionHandler = completionHandler;
this.isLast = last;
}
public void release() {
if (resource != null) {
resource.release();
resource = null;
}
}
@Override
public void notifyFailure(final Throwable e) {
final CompletionHandler chLocal = chunkedCompletionHandler;
chunkedCompletionHandler = null;
try {
if (chLocal != null) {
chLocal.failed(e);
}
} finally {
release();
}
}
@Override
public void recycle() {
}
@Override
public WriteResult getCurrentResult() {
return null;
}
}
Flush CompletionHandler
, which will be passed on each Http2Stream
write to make sure the data reached the wires. Usually FlushCompletionHandler is also used as a wrapper for custom CompletionHandler
provided by users. /**
* Flush {@link CompletionHandler}, which will be passed on each
* {@link Http2Stream} write to make sure the data reached the wires.
*
* Usually <tt>FlushCompletionHandler</tt> is also used as a wrapper for
* custom {@link CompletionHandler} provided by users.
*/
private final class FlushCompletionHandler extends ChunkedCompletionHandler {
public FlushCompletionHandler(
final CompletionHandler<WriteResult> parentCompletionHandler) {
super(parentCompletionHandler);
}
@Override
protected void done0() {
synchronized (flushHandlersSync) { // synchronize with flush()
unflushedWritesCounter.decrementAndGet();
if (flushHandlersQueue == null ||
!flushHandlersQueue.nextBundle()) {
return;
}
}
boolean hasNext;
CompletionHandler<Http2Stream> handler;
do {
synchronized (flushHandlersSync) {
handler = flushHandlersQueue.next();
hasNext = flushHandlersQueue.hasNext();
}
try {
handler.completed(stream);
} catch (Exception ignored) {
}
} while (hasNext);
}
}}