package org.glassfish.grizzly.http.io;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.FileTransfer;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.IOEvent;
import org.glassfish.grizzly.WriteHandler;
import org.glassfish.grizzly.WriteResult;
import org.glassfish.grizzly.asyncqueue.AsyncQueueWriter;
import org.glassfish.grizzly.asyncqueue.MessageCloner;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.http.HttpContent;
import org.glassfish.grizzly.http.HttpContent.Builder;
import org.glassfish.grizzly.http.HttpContext;
import org.glassfish.grizzly.http.HttpHeader;
import org.glassfish.grizzly.http.HttpServerFilter;
import org.glassfish.grizzly.http.HttpTrailer;
import org.glassfish.grizzly.http.Protocol;
import org.glassfish.grizzly.http.util.MimeType;
import org.glassfish.grizzly.http.util.Header;
import org.glassfish.grizzly.http.util.HeaderValue;
import org.glassfish.grizzly.impl.FutureImpl;
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.CompositeBuffer;
import org.glassfish.grizzly.memory.MemoryManager;
import org.glassfish.grizzly.threadpool.Threads;
import org.glassfish.grizzly.utils.Charsets;
import org.glassfish.grizzly.utils.Exceptions;
import org.glassfish.grizzly.utils.Futures;
import static org.glassfish.grizzly.Writer.Reentrant;
public class OutputBuffer {
protected static final Logger LOGGER = Grizzly.logger(OutputBuffer.class);
private static final int DEFAULT_BUFFER_SIZE =
Integer.getInteger(OutputBuffer.class.getName() + ".default-buffer-size",
1024 * 8);
private static final int MAX_CHAR_BUFFER_SIZE =
Integer.getInteger(OutputBuffer.class.getName() + ".max-char-buffer-size",
1024 * 64 + 1);
private static final int MIN_BUFFER_SIZE = 512;
private final static boolean IS_BLOCKING =
Boolean.getBoolean(OutputBuffer.class.getName() + ".isBlocking");
private FilterChainContext ctx;
private CompositeBuffer compositeBuffer;
private Buffer currentBuffer;
private final TemporaryHeapBuffer temporaryWriteBuffer =
new TemporaryHeapBuffer();
private final ByteArrayCloner cloner = new ByteArrayCloner(temporaryWriteBuffer);
private final List<LifeCycleListener> lifeCycleListeners =
new ArrayList<>(2);
private boolean committed;
private boolean ;
private boolean finished;
private boolean closed;
private CharsetEncoder encoder;
private final Map<String, CharsetEncoder> encoders =
new HashMap<>();
private char[] charsArray;
private int charsArrayLength;
private CharBuffer charsBuffer;
private MemoryManager memoryManager;
private Connection connection;
private WriteHandler handler;
private InternalWriteHandler asyncWriteHandler;
private boolean fileTransferRequested;
private int bufferSize = DEFAULT_BUFFER_SIZE;
protected boolean sendfileEnabled;
private HttpHeader ;
private Runnable writePossibleRunnable;
private Builder builder;
private boolean isNonBlockingWriteGuaranteed;
private boolean isLastWriteNonBlocking;
private HttpContext httpContext;
private Supplier<Map<String,String>> ;
public void (final HttpHeader outputHeader,
final boolean sendfileEnabled,
final FilterChainContext ctx) {
this.outputHeader = outputHeader;
if (builder == null) {
this.builder = outputHeader.httpContentBuilder();
} else {
builder.httpHeader(outputHeader);
}
this.sendfileEnabled = sendfileEnabled;
this.ctx = ctx;
httpContext = outputHeader.getProcessingState().getHttpContext();
connection = ctx.getConnection();
memoryManager = ctx.getMemoryManager();
}
@Deprecated
@SuppressWarnings({"UnusedDeclaration"})
public boolean isAsyncEnabled() {
return true;
}
@Deprecated
@SuppressWarnings("UnusedDeclaration")
public void setAsyncEnabled(boolean asyncEnabled) {
}
public void prepareCharacterEncoder() {
getEncoder();
}
public int getBufferSize() {
return bufferSize;
}
@SuppressWarnings({"UnusedDeclaration"})
public void registerLifeCycleListener(final LifeCycleListener listener) {
lifeCycleListeners.add(listener);
}
@SuppressWarnings({"UnusedDeclaration"})
public boolean removeLifeCycleListener(final LifeCycleListener listener) {
return lifeCycleListeners.remove(listener);
}
public void setBufferSize(final int bufferSize) {
if (committed) {
return;
}
final int bufferSizeLocal = Math.max(bufferSize, MIN_BUFFER_SIZE);
if (currentBuffer == null) {
this.bufferSize = bufferSizeLocal;
}
if (charsArray != null &&
charsArray.length < bufferSizeLocal) {
final char[] newCharsArray = new char[bufferSizeLocal];
System.arraycopy(charsArray, 0, newCharsArray, 0, charsArrayLength);
charsBuffer = CharBuffer.wrap(newCharsArray);
charsArray = newCharsArray;
}
}
public void reset() {
if (committed) {
throw new IllegalStateException("Cannot reset the response as it has already been committed.");
}
compositeBuffer = null;
if (currentBuffer != null) {
currentBuffer.clear();
}
charsArrayLength = 0;
encoder = null;
}
@SuppressWarnings({"UnusedDeclaration"})
public boolean isClosed() {
return closed;
}
@SuppressWarnings({"UnusedDeclaration"})
public int getBufferedDataSize() {
int size = 0;
if (compositeBuffer != null) {
size += compositeBuffer.remaining();
}
if (currentBuffer != null) {
size += currentBuffer.position();
}
size += (charsArrayLength << 1);
return size;
}
public void recycle() {
outputHeader = null;
builder.reset();
if (compositeBuffer != null) {
compositeBuffer.dispose();
compositeBuffer = null;
}
if (currentBuffer != null) {
currentBuffer.dispose();
currentBuffer = null;
}
temporaryWriteBuffer.recycle();
if (charsArray != null) {
charsArrayLength = 0;
if (charsArray.length < MAX_CHAR_BUFFER_SIZE) {
charsBuffer.clear();
} else {
charsBuffer = null;
charsArray = null;
}
}
bufferSize = DEFAULT_BUFFER_SIZE;
fileTransferRequested = false;
encoder = null;
ctx = null;
httpContext = null;
connection = null;
memoryManager = null;
handler = null;
isNonBlockingWriteGuaranteed = false;
isLastWriteNonBlocking = false;
asyncWriteHandler = null;
trailersSupplier = null;
committed = false;
finished = false;
closed = false;
headersWritten = false;
lifeCycleListeners.clear();
}
public void endRequest()
throws IOException {
if (finished) {
return;
}
final InternalWriteHandler asyncWriteQueueHandlerLocal = asyncWriteHandler;
if (asyncWriteQueueHandlerLocal != null) {
asyncWriteHandler = null;
asyncWriteQueueHandlerLocal.detach();
}
if (!closed) {
try {
close();
} catch (IOException ignored) {
}
}
if (ctx != null) {
ctx.notifyDownstream(HttpServerFilter.RESPONSE_COMPLETE_EVENT);
}
finished = true;
}
public void acknowledge() throws IOException {
ctx.write(outputHeader, IS_BLOCKING);
}
public void writeChar(int c) throws IOException {
connection.assertOpen();
if (closed) {
return;
}
updateNonBlockingStatus();
checkCharBuffer();
if (charsArrayLength == charsArray.length) {
flushCharsToBuf(true);
}
charsArray[charsArrayLength++] = (char) c;
}
public void write(char[] cbuf, int off, int len) throws IOException {
connection.assertOpen();
if (closed || len == 0) {
return;
}
updateNonBlockingStatus();
if (writingBytes()) {
flushBinaryBuffers(false);
}
checkCharBuffer();
final int remaining = charsArray.length - charsArrayLength;
if (len <= remaining) {
System.arraycopy(cbuf, off, charsArray, charsArrayLength, len);
charsArrayLength += len;
} else if (len - remaining < remaining) {
System.arraycopy(cbuf, off, charsArray, charsArrayLength, remaining);
charsArrayLength += remaining;
flushCharsToBuf(true);
System.arraycopy(cbuf, off + remaining, charsArray, 0, len - remaining);
charsArrayLength = len - remaining;
} else {
flushCharsToBuf(false);
flushCharsToBuf(CharBuffer.wrap(cbuf, off, len), true);
}
}
public void write(final char[] cbuf) throws IOException {
write(cbuf, 0, cbuf.length);
}
public void write(final String str) throws IOException {
write(str, 0, str.length());
}
public void write(final String str, final int off, final int len) throws IOException {
connection.assertOpen();
if (closed || len == 0) {
return;
}
updateNonBlockingStatus();
if (writingBytes()) {
flushBinaryBuffers(false);
}
checkCharBuffer();
if (charsArray.length - charsArrayLength >= len) {
str.getChars(off, off + len,
charsArray, charsArrayLength);
charsArrayLength += len;
return;
}
int offLocal = off;
int lenLocal = len;
do {
final int remaining = charsArray.length - charsArrayLength;
final int workingLen = Math.min(lenLocal, remaining);
str.getChars(offLocal, offLocal + workingLen,
charsArray, charsArrayLength);
charsArrayLength += workingLen;
offLocal += workingLen;
lenLocal -= workingLen;
if (lenLocal > 0) {
flushCharsToBuf(false);
}
} while (lenLocal > 0);
flushBinaryBuffersIfNeeded();
}
public void writeByte(final int b) throws IOException {
connection.assertOpen();
if (closed) {
return;
}
updateNonBlockingStatus();
if (writingChars()) {
flushCharsToBuf(false);
}
checkCurrentBuffer();
if (!currentBuffer.hasRemaining()) {
if (canWritePayloadChunk()) {
doCommit();
flushBinaryBuffers(false);
checkCurrentBuffer();
blockAfterWriteIfNeeded();
} else {
finishCurrentBuffer();
checkCurrentBuffer();
}
}
currentBuffer.put((byte) b);
}
public void write(final byte[] b) throws IOException {
write(b, 0, b.length);
}
public void sendfile(final File file, final CompletionHandler<WriteResult> handler) {
if (file == null) {
throw new IllegalArgumentException("Argument 'file' cannot be null");
}
sendfile(file, 0, file.length(), handler);
}
public void sendfile(final File file,
final long offset,
final long length,
final CompletionHandler<WriteResult> handler) {
if (!sendfileEnabled) {
throw new IllegalStateException("sendfile support isn't available.");
}
if (fileTransferRequested) {
throw new IllegalStateException("Only one file transfer allowed per request");
}
reset();
final FileTransfer f = new FileTransfer(file, offset, length);
fileTransferRequested = true;
outputHeader.setContentLengthLong(f.remaining());
if (outputHeader.getContentType() == null) {
outputHeader.setContentType(MimeType.getByFilename(file.getName()));
}
outputHeader.setHeader(Header.ContentEncoding, HeaderValue.IDENTITY);
try {
flush();
} catch (IOException e) {
if (handler != null) {
handler.failed(e);
} else {
if (LOGGER.isLoggable(Level.SEVERE)) {
LOGGER.log(Level.SEVERE,
String.format("Failed to transfer file %s. Cause: %s.",
file.getAbsolutePath(),
e.getMessage()),
e);
}
}
return;
}
ctx.write(f, handler);
}
public void write(final byte[] b, final int off, final int len) throws IOException {
connection.assertOpen();
if (closed || len == 0) {
return;
}
updateNonBlockingStatus();
if (writingChars()) {
flushCharsToBuf(false);
}
if (bufferSize >= len &&
(currentBuffer == null || currentBuffer.remaining() >= len)) {
checkCurrentBuffer();
assert currentBuffer != null;
currentBuffer.put(b, off, len);
} else if (canWritePayloadChunk()) {
temporaryWriteBuffer.reset(b, off, len);
finishCurrentBuffer();
doCommit();
if (compositeBuffer != null) {
compositeBuffer.append(temporaryWriteBuffer);
flushBuffer(compositeBuffer, false, cloner);
compositeBuffer = null;
} else {
flushBuffer(temporaryWriteBuffer, false, cloner);
}
blockAfterWriteIfNeeded();
} else {
finishCurrentBuffer();
final Buffer cloneBuffer = memoryManager.allocate(len);
cloneBuffer.put(b, off, len);
cloneBuffer.flip();
checkCompositeBuffer();
compositeBuffer.append(cloneBuffer);
}
}
public void close() throws IOException {
if (closed) {
return;
}
closed = true;
connection.assertOpen();
final boolean isJustCommitted = doCommit();
if (!flushAllBuffers(true) && (isJustCommitted || outputHeader.isChunked())) {
forceCommitHeaders(true);
}
blockAfterWriteIfNeeded();
}
public void flush() throws IOException {
connection.assertOpen();
final boolean isJustCommitted = doCommit();
if (!flushAllBuffers(false) && isJustCommitted) {
forceCommitHeaders(false);
}
blockAfterWriteIfNeeded();
}
@SuppressWarnings({"unchecked", "UnusedDeclaration"})
public void writeByteBuffer(final ByteBuffer byteBuffer) throws IOException {
final Buffer w = Buffers.wrap(memoryManager, byteBuffer);
w.allowBufferDispose(false);
writeBuffer(w);
}
public void writeBuffer(final Buffer buffer) throws IOException {
connection.assertOpen();
updateNonBlockingStatus();
finishCurrentBuffer();
checkCompositeBuffer();
compositeBuffer.append(buffer);
if (canWritePayloadChunk() &&
compositeBuffer.remaining() > bufferSize) {
flush();
}
}
@SuppressWarnings("UnusedDeclaration")
@Deprecated
public boolean canWriteChar(final int length) {
return canWrite();
}
@Deprecated
@SuppressWarnings("UnusedParameters")
public boolean canWrite(@SuppressWarnings("UnusedParameters") final int length) {
return canWrite();
}
public boolean canWrite() {
if (IS_BLOCKING || isNonBlockingWriteGuaranteed) {
return true;
}
if (httpContext.getOutputSink().canWrite()) {
isNonBlockingWriteGuaranteed = true;
return true;
}
return false;
}
@Deprecated
@SuppressWarnings("UnusedParameters")
public void notifyCanWrite(final WriteHandler handler,
@SuppressWarnings("UnusedParameters") final int length) {
notifyCanWrite(handler);
}
public void notifyCanWrite(final WriteHandler handler) {
if (this.handler != null) {
throw new IllegalStateException("Illegal attempt to set a new handler before the existing handler has been notified.");
}
if (!httpContext.getCloseable().isOpen()) {
handler.onError(connection.getCloseReason().getCause());
return;
}
this.handler = handler;
if (isNonBlockingWriteGuaranteed || canWrite()) {
final Reentrant reentrant = Reentrant.getWriteReentrant();
if (!reentrant.isMaxReentrantsReached()) {
notifyWritePossible();
} else {
notifyWritePossibleAsync();
}
return;
}
assert !IS_BLOCKING;
if (asyncWriteHandler == null) {
asyncWriteHandler = new InternalWriteHandler(this);
}
try {
httpContext.getOutputSink().notifyCanWrite(asyncWriteHandler);
} catch (Exception ignored) {
}
}
public void setTrailers(Supplier<Map<String,String>> trailersSupplier) {
this.trailersSupplier = trailersSupplier;
}
public Supplier<Map<String, String>> getTrailers() {
return trailersSupplier;
}
protected Executor getThreadPool() {
if (!Threads.isService()) {
return null;
}
final ExecutorService es = connection.getTransport().getWorkerThreadPool();
return es != null && !es.isShutdown() ? es : null;
}
@SuppressWarnings("unchecked")
private void notifyWritePossibleAsync() {
if (writePossibleRunnable == null) {
writePossibleRunnable = new Runnable() {
@Override
public void run() {
notifyWritePossible();
}
};
}
connection.executeInEventThread(IOEvent.WRITE, writePossibleRunnable);
}
private void notifyWritePossible() {
final WriteHandler localHandler = handler;
if (localHandler != null) {
final Reentrant reentrant = Reentrant.getWriteReentrant();
try {
handler = null;
reentrant.inc();
isNonBlockingWriteGuaranteed = true;
localHandler.onWritePossible();
} catch (Throwable t) {
localHandler.onError(t);
} finally {
reentrant.dec();
}
}
}
private boolean canWritePayloadChunk() {
return outputHeader.isChunkingAllowed()
|| outputHeader.getContentLength() != -1;
}
private void blockAfterWriteIfNeeded()
throws IOException {
if (IS_BLOCKING || isNonBlockingWriteGuaranteed || isLastWriteNonBlocking) {
return;
}
if (httpContext.getOutputSink().canWrite()) {
return;
}
final FutureImpl<Boolean> future = Futures.createSafeFuture();
httpContext.getOutputSink().notifyCanWrite(new WriteHandler() {
@Override
public void onWritePossible() throws Exception {
future.result(Boolean.TRUE);
}
@Override
public void onError(Throwable t) {
future.failure(Exceptions.makeIOException(t));
}
});
try {
final long writeTimeout =
connection.getWriteTimeout(TimeUnit.MILLISECONDS);
if (writeTimeout >= 0) {
future.get(writeTimeout, TimeUnit.MILLISECONDS);
} else {
future.get();
}
} catch (ExecutionException e) {
httpContext.close();
throw Exceptions.makeIOException(e.getCause());
} catch (TimeoutException e) {
httpContext.close();
throw new IOException("Write timeout exceeded when trying to flush the data");
} catch (Exception e) {
httpContext.close();
throw Exceptions.makeIOException(e);
}
}
private boolean flushAllBuffers(final boolean isLast) throws IOException {
if (charsArrayLength > 0) {
flushCharsToBuf(false);
}
return flushBinaryBuffers(isLast);
}
private boolean flushBinaryBuffers(final boolean isLast)
throws IOException {
if (!outputHeader.isChunkingAllowed()
&& outputHeader.getContentLength() == -1) {
if (!isLast) {
return false;
} else {
outputHeader.setContentLength(getBufferedDataSize());
}
}
final Buffer bufferToFlush;
final boolean isFlushComposite = compositeBuffer != null && compositeBuffer.hasRemaining();
if (isFlushComposite) {
finishCurrentBuffer();
bufferToFlush = compositeBuffer;
compositeBuffer = null;
} else if (currentBuffer != null && currentBuffer.position() > 0) {
currentBuffer.trim();
bufferToFlush = currentBuffer;
currentBuffer = null;
} else {
bufferToFlush = null;
}
if (bufferToFlush != null || isLast) {
doCommit();
flushBuffer(bufferToFlush, isLast, null);
return true;
}
return false;
}
private void flushBuffer(final Buffer bufferToFlush,
final boolean isLast, final MessageCloner<Buffer> messageCloner)
throws IOException {
final HttpContent content;
if (isLast && trailersSupplier != null && (outputHeader.isChunked() || outputHeader.getProtocol().equals(Protocol.HTTP_2_0))) {
forceCommitHeaders(false);
HttpTrailer.Builder tBuilder = outputHeader.httpTrailerBuilder().content(bufferToFlush).last(true);
final Map<String, String> trailers = trailersSupplier.get();
if (trailers != null && !trailers.isEmpty()) {
for (Map.Entry<String, String> entry : trailers.entrySet()) {
tBuilder.header(entry.getKey(), entry.getValue());
}
}
content = tBuilder.build();
} else {
content = builder.content(bufferToFlush).last(isLast).build();
}
ctx.write(null,
content,
null,
messageCloner,
IS_BLOCKING);
}
private void checkCharBuffer() {
if (charsArray == null) {
charsArray = new char[bufferSize];
charsBuffer = CharBuffer.wrap(charsArray);
}
}
private boolean writingChars() {
return (charsArray != null && charsArrayLength > 0);
}
private boolean writingBytes() {
return (currentBuffer != null && currentBuffer.position() != 0);
}
private void checkCurrentBuffer() {
if (currentBuffer == null) {
currentBuffer = memoryManager.allocate(bufferSize);
currentBuffer.allowBufferDispose(true);
}
}
private void finishCurrentBuffer() {
if (currentBuffer != null && currentBuffer.position() > 0) {
currentBuffer.trim();
checkCompositeBuffer();
compositeBuffer.append(currentBuffer);
currentBuffer = null;
}
}
private CharsetEncoder getEncoder() {
if (encoder == null) {
String encoding = outputHeader.getCharacterEncoding();
if (encoding == null) {
encoding = org.glassfish.grizzly.http.util.Constants.DEFAULT_HTTP_CHARACTER_ENCODING;
}
encoder = encoders.get(encoding);
if (encoder == null) {
final Charset cs = Charsets.lookupCharset(encoding);
encoder = cs.newEncoder();
encoder.onMalformedInput(CodingErrorAction.REPLACE);
encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
encoders.put(encoding, encoder);
} else {
encoder.reset();
}
}
return encoder;
}
private boolean doCommit() throws IOException {
if (!committed) {
notifyCommit();
committed = true;
outputHeader.getHeaders().mark();
return true;
}
return false;
}
private void (final boolean isLast) throws IOException {
if (!headersWritten) {
if (isLast) {
if (outputHeader != null) {
builder.last(true).content(null);
ctx.write(builder.build(), IS_BLOCKING);
}
} else {
ctx.write(outputHeader, IS_BLOCKING);
}
}
headersWritten = true;
}
private void checkCompositeBuffer() {
if (compositeBuffer == null) {
final CompositeBuffer buffer = CompositeBuffer.newBuffer(
memoryManager);
buffer.allowBufferDispose(true);
buffer.allowInternalBuffersDispose(true);
compositeBuffer = buffer;
}
}
private void flushCharsToBuf(final boolean canFlushToNet) throws IOException {
charsBuffer.limit(charsArrayLength);
try {
flushCharsToBuf(charsBuffer, canFlushToNet);
} finally {
charsArrayLength = 0;
charsBuffer.clear();
}
}
private void flushCharsToBuf(final CharBuffer charBuf, final boolean canFlushToNet) throws IOException {
if (!charBuf.hasRemaining()) return;
final CharsetEncoder enc = getEncoder();
checkCurrentBuffer();
if (!currentBuffer.hasRemaining()) {
finishCurrentBuffer();
checkCurrentBuffer();
}
ByteBuffer currentByteBuffer = currentBuffer.toByteBuffer();
int bufferPos = currentBuffer.position();
int byteBufferPos = currentByteBuffer.position();
CoderResult res = enc.encode(charBuf,
currentByteBuffer,
true);
currentBuffer.position(bufferPos + (currentByteBuffer.position() - byteBufferPos));
while (res == CoderResult.OVERFLOW) {
checkCurrentBuffer();
currentByteBuffer = currentBuffer.toByteBuffer();
bufferPos = currentBuffer.position();
byteBufferPos = currentByteBuffer.position();
res = enc.encode(charBuf, currentByteBuffer, true);
currentBuffer.position(bufferPos + (currentByteBuffer.position() - byteBufferPos));
if (res == CoderResult.OVERFLOW) {
finishCurrentBuffer();
}
}
if (res != CoderResult.UNDERFLOW) {
throw new IOException("Encoding error");
}
if (canFlushToNet) {
flushBinaryBuffersIfNeeded();
}
}
private void flushBinaryBuffersIfNeeded() throws IOException {
if (compositeBuffer != null) {
flushBinaryBuffers(false);
blockAfterWriteIfNeeded();
}
}
private void notifyCommit() throws IOException {
for (int i = 0, len = lifeCycleListeners.size(); i < len; i++) {
lifeCycleListeners.get(i).onCommit();
}
}
private void updateNonBlockingStatus() {
isLastWriteNonBlocking = isNonBlockingWriteGuaranteed;
isNonBlockingWriteGuaranteed = false;
}
static final class ByteArrayCloner implements MessageCloner<Buffer> {
private final TemporaryHeapBuffer temporaryWriteBuffer;
public ByteArrayCloner(final TemporaryHeapBuffer temporaryWriteBuffer) {
this.temporaryWriteBuffer = temporaryWriteBuffer;
}
@Override
public Buffer clone(final Connection connection,
final Buffer originalMessage) {
if (temporaryWriteBuffer.isDisposed()) {
return originalMessage;
}
return clone0(connection.getMemoryManager(), originalMessage);
}
Buffer clone0(final MemoryManager memoryManager,
final Buffer originalMessage) {
if (originalMessage.isComposite()) {
final CompositeBuffer compositeBuffer = (CompositeBuffer) originalMessage;
compositeBuffer.shrink();
if (!temporaryWriteBuffer.isDisposed()) {
if (compositeBuffer.remaining() == temporaryWriteBuffer.remaining()) {
compositeBuffer.allowInternalBuffersDispose(false);
compositeBuffer.tryDispose();
return temporaryWriteBuffer.cloneContent(memoryManager);
} else {
compositeBuffer.replace(temporaryWriteBuffer,
temporaryWriteBuffer.cloneContent(memoryManager));
}
}
return originalMessage;
}
return temporaryWriteBuffer.cloneContent(memoryManager);
}
}
public interface LifeCycleListener {
void onCommit() throws IOException;
}
private static class InternalWriteHandler implements WriteHandler {
private volatile OutputBuffer outputBuffer;
public InternalWriteHandler(final OutputBuffer outputBuffer) {
this.outputBuffer = outputBuffer;
}
public void detach() {
final OutputBuffer obLocal = outputBuffer;
if (obLocal != null) {
outputBuffer = null;
onError0(obLocal,
obLocal.httpContext.getCloseable().isOpen() ?
new CancellationException() :
new EOFException());
}
}
@Override
public void onWritePossible() throws Exception {
final OutputBuffer localOB = outputBuffer;
if (localOB != null) {
final Executor executor = localOB.getThreadPool();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
onWritePossible0(localOB);
} catch (Exception ignored) {
}
}
});
} else {
onWritePossible0(localOB);
}
}
}
@Override
public void onError(final Throwable t) {
final OutputBuffer localOB = outputBuffer;
if (localOB != null) {
final Executor executor = localOB.getThreadPool();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
onError0(localOB, t);
}
});
} else {
onError0(localOB, t);
}
}
}
private static void onWritePossible0(final OutputBuffer ob)
throws Exception {
try {
final Reentrant reentrant = Reentrant.getWriteReentrant();
if (!reentrant.isMaxReentrantsReached()) {
ob.notifyWritePossible();
} else {
ob.notifyWritePossibleAsync();
}
} catch (Exception ignored) {
}
}
private static void onError0(final OutputBuffer ob, final Throwable t) {
final WriteHandler localHandler = ob.handler;
if (localHandler != null) {
try {
ob.handler = null;
localHandler.onError(t);
} catch (Exception ignored) {
}
}
}
}
}