//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.ResourceBundle;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
HttpOutput
implements ServletOutputStream
as required by the Servlet specification.
HttpOutput
buffers content written by the application until a further write will overflow the buffer, at which point it triggers a commit of the response.
HttpOutput
can be closed and reopened, to allow requests included via RequestDispatcher.include(ServletRequest, ServletResponse)
to close the stream, to be reopened after the inclusion ends.
/**
* <p>{@link HttpOutput} implements {@link ServletOutputStream}
* as required by the Servlet specification.</p>
* <p>{@link HttpOutput} buffers content written by the application until a
* further write will overflow the buffer, at which point it triggers a commit
* of the response.</p>
* <p>{@link HttpOutput} can be closed and reopened, to allow requests included
* via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
* close the stream, to be reopened after the inclusion ends.</p>
*/
public class HttpOutput extends ServletOutputStream implements Runnable
{
private static final String LSTRING_FILE = "javax.servlet.LocalStrings";
private static ResourceBundle lStrings = ResourceBundle.getBundle(LSTRING_FILE);
The HttpOutput.Interceptor is a single intercept point for all
output written to the HttpOutput: via writer; via output stream;
asynchronously; or blocking.
The Interceptor can be used to implement translations (eg Gzip) or
additional buffering that acts on all output. Interceptors are
created in a chain, so that multiple concerns may intercept.
The HttpChannel
is an Interceptor
and is always the last link in any Interceptor chain.
Responses are committed by the first call to write(ByteBuffer, boolean, Callback)
and closed by a call to write(ByteBuffer, boolean, Callback)
with the last boolean set true. If no content is available to commit or close, then a null buffer is passed.
/**
* The HttpOutput.Interceptor is a single intercept point for all
* output written to the HttpOutput: via writer; via output stream;
* asynchronously; or blocking.
* <p>
* The Interceptor can be used to implement translations (eg Gzip) or
* additional buffering that acts on all output. Interceptors are
* created in a chain, so that multiple concerns may intercept.
* <p>
* The {@link HttpChannel} is an {@link Interceptor} and is always the
* last link in any Interceptor chain.
* <p>
* Responses are committed by the first call to
* {@link #write(ByteBuffer, boolean, Callback)}
* and closed by a call to {@link #write(ByteBuffer, boolean, Callback)}
* with the last boolean set true. If no content is available to commit
* or close, then a null buffer is passed.
*/
public interface Interceptor
{
Write content.
The response is committed by the first call to write and is closed by
a call with last == true. Empty content buffers may be passed to
force a commit or close.
Params: - content – The content to be written or an empty buffer.
- last – True if this is the last call to write
- callback – The callback to use to indicate
Callback.succeeded()
or Callback.failed(Throwable)
.
/**
* Write content.
* The response is committed by the first call to write and is closed by
* a call with last == true. Empty content buffers may be passed to
* force a commit or close.
*
* @param content The content to be written or an empty buffer.
* @param last True if this is the last call to write
* @param callback The callback to use to indicate {@link Callback#succeeded()}
* or {@link Callback#failed(Throwable)}.
*/
void write(ByteBuffer content, boolean last, Callback callback);
Returns: The next Interceptor in the chain or null if this is the
last Interceptor in the chain.
/**
* @return The next Interceptor in the chain or null if this is the
* last Interceptor in the chain.
*/
Interceptor getNextInterceptor();
Returns: True if the Interceptor is optimized to receive direct ByteBuffer
s in the write(ByteBuffer, boolean, Callback)
method. If false is returned, then passing direct buffers may cause inefficiencies.
/**
* @return True if the Interceptor is optimized to receive direct
* {@link ByteBuffer}s in the {@link #write(ByteBuffer, boolean, Callback)}
* method. If false is returned, then passing direct buffers may cause
* inefficiencies.
*/
boolean isOptimizedForDirectBuffers();
Reset the buffers.
If the Interceptor contains buffers then reset them.
Throws: - IllegalStateException – Thrown if the response has been
committed and buffers and/or headers cannot be reset.
/**
* Reset the buffers.
* <p>If the Interceptor contains buffers then reset them.
*
* @throws IllegalStateException Thrown if the response has been
* committed and buffers and/or headers cannot be reset.
*/
default void resetBuffer() throws IllegalStateException
{
Interceptor next = getNextInterceptor();
if (next != null)
next.resetBuffer();
}
}
private static Logger LOG = Log.getLogger(HttpOutput.class);
private static final ThreadLocal<CharsetEncoder> _encoder = new ThreadLocal<>();
private final HttpChannel _channel;
private final SharedBlockingCallback _writeBlocker;
private Interceptor _interceptor;
private long _written;
private long _flushed;
private long _firstByteTimeStamp = -1;
private ByteBuffer _aggregate;
private int _bufferSize;
private int _commitSize;
private WriteListener _writeListener;
private volatile Throwable _onError;
/*
ACTION OPEN ASYNC READY PENDING UNREADY CLOSED
-------------------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise ise
write() OPEN ise PENDING wpe wpe eof
flush() OPEN ise PENDING wpe wpe eof
close() CLOSED CLOSED CLOSED CLOSED CLOSED CLOSED
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
write completed - - - ASYNC READY->owp -
*/
private enum OutputState
{
OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED
}
private final AtomicReference<OutputState> _state = new AtomicReference<>(OutputState.OPEN);
public HttpOutput(HttpChannel channel)
{
_channel = channel;
_interceptor = channel;
_writeBlocker = new WriteBlocker(channel);
HttpConfiguration config = channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
{
LOG.warn("OutputAggregationSize {} exceeds bufferSize {}", _commitSize, _bufferSize);
_commitSize = _bufferSize;
}
}
public HttpChannel getHttpChannel()
{
return _channel;
}
public Interceptor getInterceptor()
{
return _interceptor;
}
public void setInterceptor(Interceptor interceptor)
{
_interceptor = interceptor;
}
public boolean isWritten()
{
return _written > 0;
}
public long getWritten()
{
return _written;
}
public void reopen()
{
_state.set(OutputState.OPEN);
}
private boolean isLastContentToWrite(int len)
{
_written += len;
return _channel.getResponse().isAllContentWritten(_written);
}
public boolean isAllContentWritten()
{
return _channel.getResponse().isAllContentWritten(_written);
}
protected Blocker acquireWriteBlockingCallback() throws IOException
{
return _writeBlocker.acquire();
}
private void abort(Throwable failure)
{
closed();
_channel.abort(failure);
}
@Override
public void close()
{
while (true)
{
OutputState state = _state.get();
switch (state)
{
case CLOSED:
{
return;
}
case ASYNC:
{
// A close call implies a write operation, thus in asynchronous mode
// a call to isReady() that returned true should have been made.
// However it is desirable to allow a close at any time, specially if
// complete is called. Thus we simulate a call to isReady here, assuming
// that we can transition to READY.
if (!_state.compareAndSet(state, OutputState.READY))
continue;
break;
}
case UNREADY:
case PENDING:
{
// A close call implies a write operation, thus in asynchronous mode
// a call to isReady() that returned true should have been made.
// However it is desirable to allow a close at any time, specially if
// complete is called. Because the prior write has not yet completed
// and/or isReady has not been called, this close is allowed, but will
// abort the response.
if (!_state.compareAndSet(state, OutputState.CLOSED))
continue;
IOException ex = new IOException("Closed while Pending/Unready");
LOG.warn(ex.toString());
LOG.debug(ex);
abort(ex);
return;
}
default:
{
if (!_state.compareAndSet(state, OutputState.CLOSED))
continue;
// Do a normal close by writing the aggregate buffer or an empty buffer. If we are
// not including, then indicate this is the last write.
try
{
write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
}
catch (IOException x)
{
LOG.ignore(x); // Ignore it, it's been already logged in write().
}
finally
{
releaseBuffer();
}
// Return even if an exception is thrown by write().
return;
}
}
}
}
Called to indicate that the last write has been performed.
It updates the state and performs cleanup operations.
/**
* Called to indicate that the last write has been performed.
* It updates the state and performs cleanup operations.
*/
void closed()
{
while (true)
{
OutputState state = _state.get();
switch (state)
{
case CLOSED:
{
return;
}
case UNREADY:
{
if (_state.compareAndSet(state, OutputState.ERROR))
_writeListener.onError(_onError == null ? new EofException("Async closed") : _onError);
break;
}
default:
{
if (!_state.compareAndSet(state, OutputState.CLOSED))
break;
try
{
_channel.getResponse().closeOutput();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
abort(x);
}
finally
{
releaseBuffer();
}
// Return even if an exception is thrown by closeOutput().
return;
}
}
}
}
private void releaseBuffer()
{
if (_aggregate != null)
{
_channel.getConnector().getByteBufferPool().release(_aggregate);
_aggregate = null;
}
}
public boolean isClosed()
{
return _state.get() == OutputState.CLOSED;
}
public boolean isAsync()
{
switch (_state.get())
{
case ASYNC:
case READY:
case PENDING:
case UNREADY:
return true;
default:
return false;
}
}
@Override
public void flush() throws IOException
{
while (true)
{
switch (_state.get())
{
case OPEN:
write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false);
return;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
new AsyncFlush().iterate();
return;
case PENDING:
return;
case UNREADY:
throw new WritePendingException();
case ERROR:
throw new EofException(_onError);
case CLOSED:
return;
default:
throw new IllegalStateException();
}
}
}
private void write(ByteBuffer content, boolean complete) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
write(content, complete, blocker);
blocker.block();
}
catch (Exception failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
if (failure instanceof IOException)
throw failure;
throw new IOException(failure);
}
}
protected void write(ByteBuffer content, boolean complete, Callback callback)
{
if (_firstByteTimeStamp == -1)
{
long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate();
if (minDataRate > 0)
_firstByteTimeStamp = System.nanoTime();
else
_firstByteTimeStamp = Long.MAX_VALUE;
}
_interceptor.write(content, complete, callback);
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
// Async or Blocking ?
while (true)
{
switch (_state.get())
{
case OPEN:
// process blocking below
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
// Should we aggregate?
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content
if (filled == len && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
throw new IllegalStateException();
return;
}
// adjust offset/length
off += filled;
len -= filled;
}
// Do the asynchronous writing from the callback
new AsyncWrite(b, off, len, last).iterate();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case ERROR:
throw new EofException(_onError);
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
// handle blocking write
// Should we aggregate?
int capacity = getBufferSize();
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content
if (filled == len && !BufferUtil.isFull(_aggregate))
return;
// adjust offset/length
off += filled;
len -= filled;
}
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
write(_aggregate, last && len == 0);
// should we fill aggregate again from the buffer?
if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate))
{
BufferUtil.append(_aggregate, b, off, len);
return;
}
}
// write any remaining content in the buffer directly
if (len > 0)
{
// write a buffer capacity at a time to avoid JVM pooling large direct buffers
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
ByteBuffer view = ByteBuffer.wrap(b, off, len);
while (len > getBufferSize())
{
final int p = view.position();
final int l = p + getBufferSize();
view.limit(p + getBufferSize());
write(view, false);
len -= getBufferSize();
view.limit(l + Math.min(len, getBufferSize()));
view.position(l);
}
write(view, last);
}
else if (last)
{
write(BufferUtil.EMPTY_BUFFER, true);
}
if (last)
closed();
}
public void write(ByteBuffer buffer) throws IOException
{
// This write always bypasses aggregate buffer
// Async or Blocking ?
while (true)
{
switch (_state.get())
{
case OPEN:
// process blocking below
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
// Do the asynchronous writing from the callback
boolean last = isLastContentToWrite(buffer.remaining());
new AsyncWrite(buffer, last).iterate();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case ERROR:
throw new EofException(_onError);
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
// handle blocking write
int len = BufferUtil.length(buffer);
boolean last = isLastContentToWrite(len);
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
write(_aggregate, last && len == 0);
// write any remaining content in the buffer directly
if (len > 0)
write(buffer, last);
else if (last)
write(BufferUtil.EMPTY_BUFFER, true);
if (last)
closed();
}
@Override
public void write(int b) throws IOException
{
_written += 1;
boolean complete = _channel.getResponse().isAllContentWritten(_written);
// Async or Blocking ?
while (true)
{
switch (_state.get())
{
case OPEN:
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full
if (complete || BufferUtil.isFull(_aggregate))
{
write(_aggregate, complete);
if (complete)
closed();
}
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full
if (!complete && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
throw new IllegalStateException();
return;
}
// Do the asynchronous writing from the callback
new AsyncFlush().iterate();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case ERROR:
throw new EofException(_onError);
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
}
@Override
public void print(String s) throws IOException
{
print(s, false);
}
private void print(String s, boolean eoln) throws IOException
{
if (isClosed())
throw new IOException("Closed");
String charset = _channel.getResponse().getCharacterEncoding();
CharsetEncoder encoder = _encoder.get();
if (encoder == null || !encoder.charset().name().equalsIgnoreCase(charset))
{
encoder = Charset.forName(charset).newEncoder();
encoder.onMalformedInput(CodingErrorAction.REPLACE);
encoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
_encoder.set(encoder);
}
else
{
encoder.reset();
}
CharBuffer in = CharBuffer.wrap(s);
CharBuffer crlf = eoln ? CharBuffer.wrap("\r\n") : null;
ByteBuffer out = getHttpChannel().getByteBufferPool().acquire((int)(1 + (s.length() + 2) * encoder.averageBytesPerChar()), false);
BufferUtil.flipToFill(out);
for (; ; )
{
CoderResult result;
if (in.hasRemaining())
{
result = encoder.encode(in, out, crlf == null);
if (result.isUnderflow())
if (crlf == null)
break;
else
continue;
}
else if (crlf != null && crlf.hasRemaining())
{
result = encoder.encode(crlf, out, true);
if (result.isUnderflow())
{
if (!encoder.flush(out).isUnderflow())
result.throwException();
break;
}
}
else
break;
if (result.isOverflow())
{
BufferUtil.flipToFlush(out, 0);
ByteBuffer bigger = BufferUtil.ensureCapacity(out, out.capacity() + s.length() + 2);
getHttpChannel().getByteBufferPool().release(out);
BufferUtil.flipToFill(bigger);
out = bigger;
continue;
}
result.throwException();
}
BufferUtil.flipToFlush(out, 0);
write(out.array(), out.arrayOffset(), out.remaining());
getHttpChannel().getByteBufferPool().release(out);
}
@Override
public void println(String s) throws IOException
{
print(s, true);
}
@Override
public void println(boolean b) throws IOException
{
println(lStrings.getString(b ? "value.true" : "value.false"));
}
@Override
public void println(char c) throws IOException
{
println(String.valueOf(c));
}
@Override
public void println(int i) throws IOException
{
println(String.valueOf(i));
}
@Override
public void println(long l) throws IOException
{
println(String.valueOf(l));
}
@Override
public void println(float f) throws IOException
{
println(String.valueOf(f));
}
@Override
public void println(double d) throws IOException
{
println(String.valueOf(d));
}
Blocking send of whole content.
Params: - content – The whole content to send
Throws: - IOException – if the send fails
/**
* Blocking send of whole content.
*
* @param content The whole content to send
* @throws IOException if the send fails
*/
public void sendContent(ByteBuffer content) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent({})", BufferUtil.toDetailString(content));
_written += content.remaining();
write(content, true);
closed();
}
Blocking send of stream content.
Params: - in – The stream content to send
Throws: - IOException – if the send fails
/**
* Blocking send of stream content.
*
* @param in The stream content to send
* @throws IOException if the send fails
*/
public void sendContent(InputStream in) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
new InputStreamWritingCB(in, blocker).iterate();
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
}
Blocking send of channel content.
Params: - in – The channel content to send
Throws: - IOException – if the send fails
/**
* Blocking send of channel content.
*
* @param in The channel content to send
* @throws IOException if the send fails
*/
public void sendContent(ReadableByteChannel in) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
new ReadableByteChannelWritingCB(in, blocker).iterate();
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
}
Blocking send of HTTP content.
Params: - content – The HTTP content to send
Throws: - IOException – if the send fails
/**
* Blocking send of HTTP content.
*
* @param content The HTTP content to send
* @throws IOException if the send fails
*/
public void sendContent(HttpContent content) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
sendContent(content, blocker);
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
}
Asynchronous send of whole content.
Params: - content – The whole content to send
- callback – The callback to use to notify success or failure
/**
* Asynchronous send of whole content.
*
* @param content The whole content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(ByteBuffer content, final Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback);
_written += content.remaining();
write(content, true, new Callback.Nested(callback)
{
@Override
public void succeeded()
{
closed();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
abort(x);
super.failed(x);
}
});
}
Asynchronous send of stream content.
The stream will be closed after reading all content.
Params: - in – The stream content to send
- callback – The callback to use to notify success or failure
/**
* Asynchronous send of stream content.
* The stream will be closed after reading all content.
*
* @param in The stream content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(InputStream in, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent(stream={},{})", in, callback);
new InputStreamWritingCB(in, callback).iterate();
}
Asynchronous send of channel content.
The channel will be closed after reading all content.
Params: - in – The channel content to send
- callback – The callback to use to notify success or failure
/**
* Asynchronous send of channel content.
* The channel will be closed after reading all content.
*
* @param in The channel content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(ReadableByteChannel in, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent(channel={},{})", in, callback);
new ReadableByteChannelWritingCB(in, callback).iterate();
}
Asynchronous send of HTTP content.
Params: - httpContent – The HTTP content to send
- callback – The callback to use to notify success or failure
/**
* Asynchronous send of HTTP content.
*
* @param httpContent The HTTP content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(HttpContent httpContent, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent(http={},{})", httpContent, callback);
if (BufferUtil.hasContent(_aggregate))
{
callback.failed(new IOException("cannot sendContent() after write()"));
return;
}
if (_channel.isCommitted())
{
callback.failed(new IOException("cannot sendContent(), output already committed"));
return;
}
while (true)
{
switch (_state.get())
{
case OPEN:
if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
continue;
break;
case ERROR:
callback.failed(new EofException(_onError));
return;
case CLOSED:
callback.failed(new EofException("Closed"));
return;
default:
throw new IllegalStateException();
}
break;
}
ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
if (buffer == null)
buffer = httpContent.getIndirectBuffer();
if (buffer != null)
{
sendContent(buffer, callback);
return;
}
try
{
ReadableByteChannel rbc = httpContent.getReadableByteChannel();
if (rbc != null)
{
// Close of the rbc is done by the async sendContent
sendContent(rbc, callback);
return;
}
InputStream in = httpContent.getInputStream();
if (in != null)
{
sendContent(in, callback);
return;
}
throw new IllegalArgumentException("unknown content for " + httpContent);
}
catch (Throwable th)
{
abort(th);
callback.failed(th);
}
}
public int getBufferSize()
{
return _bufferSize;
}
public void setBufferSize(int size)
{
_bufferSize = size;
_commitSize = size;
}
Invoked when bytes have been flushed to the network.
The number of flushed bytes may be different from the bytes written by the application if an Interceptor
changed them, for example by compressing them.
Params: - bytes – the number of bytes flushed
Throws: - IOException – if the minimum data rate, when set, is not respected
See Also:
/**
* <p>Invoked when bytes have been flushed to the network.</p>
* <p>The number of flushed bytes may be different from the bytes written
* by the application if an {@link Interceptor} changed them, for example
* by compressing them.</p>
*
* @param bytes the number of bytes flushed
* @throws IOException if the minimum data rate, when set, is not respected
* @see org.eclipse.jetty.io.WriteFlusher.Listener
*/
public void onFlushed(long bytes) throws IOException
{
if (_firstByteTimeStamp == -1 || _firstByteTimeStamp == Long.MAX_VALUE)
return;
long minDataRate = getHttpChannel().getHttpConfiguration().getMinResponseDataRate();
_flushed += bytes;
long elapsed = System.nanoTime() - _firstByteTimeStamp;
long minFlushed = minDataRate * TimeUnit.NANOSECONDS.toMillis(elapsed) / TimeUnit.SECONDS.toMillis(1);
if (LOG.isDebugEnabled())
LOG.debug("Flushed bytes min/actual {}/{}", minFlushed, _flushed);
if (_flushed < minFlushed)
{
IOException ioe = new IOException(String.format("Response content data rate < %d B/s", minDataRate));
_channel.abort(ioe);
throw ioe;
}
}
public void recycle()
{
_interceptor = _channel;
HttpConfiguration config = _channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
_commitSize = _bufferSize;
releaseBuffer();
_written = 0;
_writeListener = null;
_onError = null;
_firstByteTimeStamp = -1;
_flushed = 0;
reopen();
}
public void resetBuffer()
{
_interceptor.resetBuffer();
if (BufferUtil.hasContent(_aggregate))
BufferUtil.clear(_aggregate);
_written = 0;
reopen();
}
@Override
public void setWriteListener(WriteListener writeListener)
{
if (!_channel.getState().isAsync())
throw new IllegalStateException("!ASYNC");
if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
{
_writeListener = writeListener;
if (_channel.getState().onWritePossible())
_channel.execute(_channel);
}
else
throw new IllegalStateException();
}
@Override
public boolean isReady()
{
while (true)
{
switch (_state.get())
{
case OPEN:
return true;
case ASYNC:
if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
continue;
return true;
case READY:
return true;
case PENDING:
if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
continue;
return false;
case UNREADY:
return false;
case ERROR:
return true;
case CLOSED:
return true;
default:
throw new IllegalStateException();
}
}
}
@Override
public void run()
{
while (true)
{
OutputState state = _state.get();
if (_onError != null)
{
switch (state)
{
case CLOSED:
case ERROR:
{
_onError = null;
return;
}
default:
{
if (_state.compareAndSet(state, OutputState.ERROR))
{
Throwable th = _onError;
_onError = null;
if (LOG.isDebugEnabled())
LOG.debug("onError", th);
try
{
_writeListener.onError(th);
}
finally
{
IO.close(this);
}
return;
}
}
}
continue;
}
// We do not check the state here. Strictly speaking the state should
// always be READY when run is called. However, other async threads or
// a prior call by this thread to onDataAvailable may have called write
// after onWritePossible was called, so the state could be any of the
// write states.
//
// Even if the state is CLOSED, we need to call onWritePossible to tell
// async producer that the last write completed.
//
// We have to trust the scheduling of this run was done
// for good reason, that is protected correctly by HttpChannelState and
// that implementations of onWritePossible will
// themselves check isReady(). If multiple threads are calling write,
// then they must either rely on only a single container thread being
// dispatched or perform their own mutual exclusion.
try
{
_writeListener.onWritePossible();
break;
}
catch (Throwable e)
{
_onError = e;
}
}
}
@Override
public String toString()
{
return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get());
}
private abstract class AsyncICB extends IteratingCallback
{
final boolean _last;
AsyncICB(boolean last)
{
_last = last;
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
@Override
protected void onCompleteSuccess()
{
while (true)
{
OutputState last = _state.get();
switch (last)
{
case PENDING:
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
continue;
break;
case UNREADY:
if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
continue;
if (_last)
closed();
if (_channel.getState().onWritePossible())
_channel.execute(_channel);
break;
case CLOSED:
break;
default:
throw new IllegalStateException();
}
break;
}
}
@Override
public void onCompleteFailure(Throwable e)
{
_onError = e == null ? new IOException() : e;
if (_channel.getState().onWritePossible())
_channel.execute(_channel);
}
}
private class AsyncFlush extends AsyncICB
{
protected volatile boolean _flushed;
public AsyncFlush()
{
super(false);
}
@Override
protected Action process()
{
if (BufferUtil.hasContent(_aggregate))
{
_flushed = true;
write(_aggregate, false, this);
return Action.SCHEDULED;
}
if (!_flushed)
{
_flushed = true;
write(BufferUtil.EMPTY_BUFFER, false, this);
return Action.SCHEDULED;
}
return Action.SUCCEEDED;
}
}
private class AsyncWrite extends AsyncICB
{
private final ByteBuffer _buffer;
private final ByteBuffer _slice;
private final int _len;
protected volatile boolean _completed;
public AsyncWrite(byte[] b, int off, int len, boolean last)
{
super(last);
_buffer = ByteBuffer.wrap(b, off, len);
_len = len;
// always use a view for large byte arrays to avoid JVM pooling large direct buffers
_slice = _len < getBufferSize() ? null : _buffer.duplicate();
}
public AsyncWrite(ByteBuffer buffer, boolean last)
{
super(last);
_buffer = buffer;
_len = buffer.remaining();
// Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
if (_buffer.isDirect() || _len < getBufferSize())
_slice = null;
else
{
_slice = _buffer.duplicate();
}
}
@Override
protected Action process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_completed = _len == 0;
write(_aggregate, _last && _completed, this);
return Action.SCHEDULED;
}
// Can we just aggregate the remainder?
if (!_last && _len < BufferUtil.space(_aggregate) && _len < _commitSize)
{
int position = BufferUtil.flipToFill(_aggregate);
BufferUtil.put(_buffer, _aggregate);
BufferUtil.flipToFlush(_aggregate, position);
return Action.SUCCEEDED;
}
// Is there data left to write?
if (_buffer.hasRemaining())
{
// if there is no slice, just write it
if (_slice == null)
{
_completed = true;
write(_buffer, _last, this);
return Action.SCHEDULED;
}
// otherwise take a slice
int p = _buffer.position();
int l = Math.min(getBufferSize(), _buffer.remaining());
int pl = p + l;
_slice.limit(pl);
_buffer.position(pl);
_slice.position(p);
_completed = !_buffer.hasRemaining();
write(_slice, _last && _completed, this);
return Action.SCHEDULED;
}
// all content written, but if we have not yet signal completion, we
// need to do so
if (_last && !_completed)
{
_completed = true;
write(BufferUtil.EMPTY_BUFFER, true, this);
return Action.SCHEDULED;
}
if (LOG.isDebugEnabled() && _completed)
LOG.debug("EOF of {}", this);
return Action.SUCCEEDED;
}
}
An iterating callback that will take content from an InputStream and write it to the associated HttpChannel
. A non direct buffer of size HttpOutput.getBufferSize()
is used. This callback is passed to the HttpChannel.write(ByteBuffer, boolean, Callback)
to be notified as each buffer is written and only once all the input is consumed will the wrapped Callback.succeeded()
method be called. /**
* An iterating callback that will take content from an
* InputStream and write it to the associated {@link HttpChannel}.
* A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
*/
private class InputStreamWritingCB extends IteratingNestedCallback
{
private final InputStream _in;
private final ByteBuffer _buffer;
private boolean _eof;
public InputStreamWritingCB(InputStream in, Callback callback)
{
super(callback);
_in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
}
@Override
protected Action process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
if (_eof)
{
if (LOG.isDebugEnabled())
LOG.debug("EOF of {}", this);
// Handle EOF
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return Action.SUCCEEDED;
}
// Read until buffer full or EOF
int len = 0;
while (len < _buffer.capacity() && !_eof)
{
int r = _in.read(_buffer.array(), _buffer.arrayOffset() + len, _buffer.capacity() - len);
if (r < 0)
_eof = true;
else
len += r;
}
// write what we have
_buffer.position(0);
_buffer.limit(len);
_written += len;
write(_buffer, _eof, this);
return Action.SCHEDULED;
}
@Override
public void onCompleteFailure(Throwable x)
{
abort(x);
_channel.getByteBufferPool().release(_buffer);
IO.close(_in);
super.onCompleteFailure(x);
}
}
An iterating callback that will take content from a ReadableByteChannel and write it to the HttpChannel
. A ByteBuffer
of size HttpOutput.getBufferSize()
is used that will be direct if HttpChannel.useDirectBuffers()
is true. This callback is passed to the HttpChannel.write(ByteBuffer, boolean, Callback)
to be notified as each buffer is written and only once all the input is consumed will the wrapped Callback.succeeded()
method be called. /**
* An iterating callback that will take content from a
* ReadableByteChannel and write it to the {@link HttpChannel}.
* A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
* {@link HttpChannel#useDirectBuffers()} is true.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
*/
private class ReadableByteChannelWritingCB extends IteratingNestedCallback
{
private final ReadableByteChannel _in;
private final ByteBuffer _buffer;
private boolean _eof;
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{
super(callback);
_in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
}
@Override
protected Action process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
if (_eof)
{
if (LOG.isDebugEnabled())
LOG.debug("EOF of {}", this);
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return Action.SUCCEEDED;
}
// Read from stream until buffer full or EOF
BufferUtil.clearToFill(_buffer);
while (_buffer.hasRemaining() && !_eof)
{
_eof = (_in.read(_buffer)) < 0;
}
// write what we have
BufferUtil.flipToFlush(_buffer, 0);
_written += _buffer.remaining();
write(_buffer, _eof, this);
return Action.SCHEDULED;
}
@Override
public void onCompleteFailure(Throwable x)
{
abort(x);
_channel.getByteBufferPool().release(_buffer);
IO.close(_in);
super.onCompleteFailure(x);
}
}
private static class WriteBlocker extends SharedBlockingCallback
{
private final HttpChannel _channel;
private WriteBlocker(HttpChannel channel)
{
_channel = channel;
}
}
}