/*
* Copyright (c) 2010, 2017 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.grizzly.http.io;
import org.glassfish.grizzly.http.HttpBrokenContent;
import org.glassfish.grizzly.http.HttpHeader;
import org.glassfish.grizzly.http.HttpTrailer;
import java.io.EOFException;
import org.glassfish.grizzly.Buffer;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.ReadResult;
import org.glassfish.grizzly.ReadHandler;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.http.HttpContent;
import org.glassfish.grizzly.http.util.MimeHeaders;
import org.glassfish.grizzly.threadpool.Threads;
import org.glassfish.grizzly.utils.Charsets;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CoderResult;
import java.nio.charset.CodingErrorAction;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.http.HttpBrokenContentException;
import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.CompositeBuffer;
import org.glassfish.grizzly.utils.Exceptions;
import static org.glassfish.grizzly.http.util.Constants.*;
Abstraction exposing both byte and character methods to read content
from the HTTP messaging system in Grizzly.
/**
* Abstraction exposing both byte and character methods to read content
* from the HTTP messaging system in Grizzly.
*/
public class InputBuffer {
private static final Logger LOGGER = Grizzly.logger(InputBuffer.class);
private static final Level LOGGER_LEVEL = Level.FINER;
The HttpHeader
associated with this InputBuffer
/**
* The {@link org.glassfish.grizzly.http.HttpHeader} associated with this <code>InputBuffer</code>
*/
private HttpHeader httpHeader;
The FilterChainContext
associated with this InputBuffer
. The FilterChainContext
will be using to read POST
data in blocking fashion.
/**
* The {@link FilterChainContext} associated with this <code>InputBuffer</code>.
* The {@link FilterChainContext} will be using to read <code>POST</code>
* data in blocking fashion.
*/
private FilterChainContext ctx;
Flag indicating whether or not this InputBuffer
is processing
character data.
/**
* Flag indicating whether or not this <code>InputBuffer</code> is processing
* character data.
*/
private boolean processingChars;
Flag indicating whether or not this InputBuffer
has been
closed.
/**
* Flag indicating whether or not this <code>InputBuffer</code> has been
* closed.
*/
private boolean closed;
The Buffer
consisting of the bytes from the HTTP message chunk(s). /**
* The {@link Buffer} consisting of the bytes from the HTTP
* message chunk(s).
*/
private Buffer inputContentBuffer;
The Connection
associated with this HttpRequestPacket
. /**
* The {@link Connection} associated with this {@link org.glassfish.grizzly.http.HttpRequestPacket}.
*/
private Connection connection;
The mark position within the current binary content. Marking is not
supported for character content.
/**
* The mark position within the current binary content. Marking is not
* supported for character content.
*/
private int markPos = -1;
Represents how many bytes can be read before markPos
is invalidated. /**
* Represents how many bytes can be read before {@link #markPos} is
* invalidated.
*/
private int readAheadLimit = -1;
Counter to track how many bytes have been read. This counter is only
used with the byte stream has been marked.
/**
* Counter to track how many bytes have been read. This counter is only
* used with the byte stream has been marked.
*/
private int readCount = 0;
The default character encoding to use if none is specified by the HttpRequestPacket
. /**
* The default character encoding to use if none is specified by the
* {@link org.glassfish.grizzly.http.HttpRequestPacket}.
*/
private String encoding = DEFAULT_HTTP_CHARACTER_ENCODING;
The CharsetDecoder
used to convert binary to character data. /**
* The {@link CharsetDecoder} used to convert binary to character data.
*/
private CharsetDecoder decoder;
CharsetDecoders cache
/**
* CharsetDecoders cache
*/
private final Map<String, CharsetDecoder> decoders = new HashMap<>();
Flag indicating all request content has been read.
/**
* Flag indicating all request content has been read.
*/
private boolean contentRead;
The ReadHandler
to be notified as content is read. /**
* The {@link ReadHandler} to be notified as content is read.
*/
private ReadHandler handler;
The length of the content that must be read before notifying the ReadHandler
. /**
* The length of the content that must be read before notifying the
* {@link ReadHandler}.
*/
private int requestedSize;
CharBuffer
for converting a single character at a time. /**
* {@link CharBuffer} for converting a single character at a time.
*/
private final CharBuffer singleCharBuf =
(CharBuffer) CharBuffer.allocate(1).position(1); // create CharBuffer w/ 0 chars remaining
Used to estimate how many characters can be produced from a variable
number of bytes.
/**
* Used to estimate how many characters can be produced from a variable
* number of bytes.
*/
private float averageCharsPerByte = 1.0f;
Flag shows if we're currently waiting for input data asynchronously
(is OP_READ enabled for the Connection)
/**
* Flag shows if we're currently waiting for input data asynchronously
* (is OP_READ enabled for the Connection)
*/
private boolean isWaitingDataAsynchronously;
Trailer headers, if any.
/**
* Trailer headers, if any.
*/
protected Map<String,String> trailers;
// ------------------------------------------------------------ Constructors
Per-request initialization required for the InputBuffer to function
properly.
Params: - httpHeader – the header from which input will be obtained.
- ctx – the FilterChainContext for the chain processing this request
/**
* <p>
* Per-request initialization required for the InputBuffer to function
* properly.
* </p>
*
* @param httpHeader the header from which input will be obtained.
* @param ctx the FilterChainContext for the chain processing this request
*/
public void initialize(final HttpHeader httpHeader,
final FilterChainContext ctx) {
if (ctx == null) {
throw new IllegalArgumentException("ctx cannot be null.");
}
this.httpHeader = httpHeader;
this.ctx = ctx;
connection = ctx.getConnection();
final Object message = ctx.getMessage();
if (message instanceof HttpContent) {
final HttpContent content = (HttpContent) message;
// Check if HttpContent is chunked message trailer w/ headers
checkHttpTrailer(content);
updateInputContentBuffer(content.getContent());
contentRead = content.isLast();
if (contentRead) {
processTrailers();
}
content.recycle();
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s initialize with ready content: %s",
this, inputContentBuffer);
}
}
}
Set the default character encoding for this InputBuffer, which would be applied if no encoding was explicitly set on HTTP HttpRequestPacket
and character decoding wasn't started yet. /**
* Set the default character encoding for this <tt>InputBuffer</tt>, which
* would be applied if no encoding was explicitly set on HTTP
* {@link org.glassfish.grizzly.http.HttpRequestPacket} and character decoding
* wasn't started yet.
*/
@SuppressWarnings("UnusedDeclaration")
public void setDefaultEncoding(final String encoding) {
this.encoding = encoding;
}
Recycle this InputBuffer
for reuse.
/**
* <p>
* Recycle this <code>InputBuffer</code> for reuse.
* </p>
*/
public void recycle() {
inputContentBuffer.tryDispose();
inputContentBuffer = null;
singleCharBuf.position(singleCharBuf.limit());
connection = null;
decoder = null;
ctx = null;
handler = null;
trailers = null;
processingChars = false;
closed = false;
contentRead = false;
markPos = -1;
readAheadLimit = -1;
requestedSize = -1;
readCount = 0;
averageCharsPerByte = 1.0f;
isWaitingDataAsynchronously = false;
encoding = DEFAULT_HTTP_CHARACTER_ENCODING;
}
This method should be called if the InputBuffer is being used in conjunction with a Reader
implementation. If this method is not called, any character-based methods called on this InputBuffer
will throw a IllegalStateException
.
/**
* <p>
* This method should be called if the InputBuffer is being used in conjunction
* with a {@link java.io.Reader} implementation. If this method is not called,
* any character-based methods called on this <code>InputBuffer</code> will
* throw a {@link IllegalStateException}.
* </p>
*/
public void processingChars() {
if (!processingChars) {
processingChars = true;
final String enc = httpHeader.getCharacterEncoding();
if (enc != null) {
encoding = enc;
final CharsetDecoder localDecoder = getDecoder();
averageCharsPerByte = localDecoder.averageCharsPerByte();
}
}
}
// --------------------------------------------- InputStream-Related Methods
This method always blocks.
See Also: - read.read()
/**
* This method always blocks.
* @see java.io.InputStream#read()
*/
public int readByte() throws IOException {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s readByte. Ready content: %s",
this, inputContentBuffer);
}
if (closed) {
throw new IOException("Already closed");
}
if (!inputContentBuffer.hasRemaining()) {
if (fill(1) == -1) {
return -1;
}
}
checkMarkAfterRead(1);
return inputContentBuffer.get() & 0xFF;
}
See Also: - read.read(byte[], int, int)
/**
* @see java.io.InputStream#read(byte[], int, int)
*/
public int read(final byte b[], final int off, final int len) throws IOException {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s read byte array of len: %s. Ready content: %s",
this, len, inputContentBuffer);
}
if (closed) {
throw new IOException("Already closed");
}
if (len == 0) {
return 0;
}
if (!inputContentBuffer.hasRemaining()) {
if (fill(1) == -1) {
return -1;
}
}
int nlen = Math.min(inputContentBuffer.remaining(), len);
inputContentBuffer.get(b, off, nlen);
if (!checkMarkAfterRead(nlen)) {
inputContentBuffer.shrink();
}
return nlen;
}
Depending on the InputBuffer mode, method will return either
number of available bytes or characters ready to be read without blocking.
Returns: depending on the InputBuffer mode, method will return
either number of available bytes or characters ready to be read without
blocking.
/**
* Depending on the <tt>InputBuffer</tt> mode, method will return either
* number of available bytes or characters ready to be read without blocking.
*
* @return depending on the <tt>InputBuffer</tt> mode, method will return
* either number of available bytes or characters ready to be read without
* blocking.
*/
public int readyData() {
if (closed) return 0;
return ((processingChars) ? availableChar() : available());
}
See Also: - available.available()
/**
* @see java.io.InputStream#available()
*/
public int available() {
return ((closed) ? 0 : inputContentBuffer.remaining());
}
Returns the duplicate of the underlying Buffer
used to buffer incoming request data. The content of the returned buffer will be that of the underlying buffer. Changes to returned buffer's content will be visible in the underlying buffer, and vice versa; the two buffers' position, limit, and mark values will be independent. Returns: the duplicate of the underlying Buffer
used to buffer incoming request data.
/**
* Returns the duplicate of the underlying {@link Buffer} used to buffer
* incoming request data. The content of the returned buffer will be that
* of the underlying buffer. Changes to returned buffer's content will be
* visible in the underlying buffer, and vice versa; the two buffers'
* position, limit, and mark values will be independent.
*
* @return the duplicate of the underlying
* {@link Buffer} used to buffer incoming request data.
*/
public Buffer getBuffer() {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s getBuffer. Ready content: %s",
this, inputContentBuffer);
}
return inputContentBuffer.duplicate();
}
Returns: the underlying Buffer
used to buffer incoming request data. Unlike getBuffer()
, this method detaches the returned Buffer
, so user code becomes responsible for handling the Buffer
.
/**
* @return the underlying {@link Buffer} used to buffer incoming request
* data. Unlike {@link #getBuffer()}, this method detaches the returned
* {@link Buffer}, so user code becomes responsible for handling
* the {@link Buffer}.
*/
public Buffer readBuffer() {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s readBuffer. Ready content: %s",
this, inputContentBuffer);
}
return readBuffer(inputContentBuffer.remaining());
}
Params: - size – the requested size of the
Buffer
to be returned.
Returns: the Buffer
of a given size, which represents a chunk of the underlying Buffer
which contains incoming request data. This method detaches the returned Buffer
, so user code becomes responsible for handling its life-cycle.
/**
* @param size the requested size of the {@link Buffer} to be returned.
*
* @return the {@link Buffer} of a given size, which represents a chunk
* of the underlying {@link Buffer} which contains incoming request
* data. This method detaches the returned
* {@link Buffer}, so user code becomes responsible for handling its life-cycle.
*/
public Buffer readBuffer(final int size) {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s readBuffer(size), size: %s. Ready content: %s",
this, size, inputContentBuffer);
}
final int remaining = inputContentBuffer.remaining();
if (size > remaining) {
throw new IllegalStateException("Can not read more bytes than available");
}
final Buffer buffer;
if (size == remaining) {
buffer = inputContentBuffer;
inputContentBuffer = Buffers.EMPTY_BUFFER;
} else {
final Buffer tmpBuffer = inputContentBuffer.split(
inputContentBuffer.position() + size);
buffer = inputContentBuffer;
inputContentBuffer = tmpBuffer;
}
return buffer;
}
Returns: the ReadHandler
current in use, if any.
/**
* @return the {@link ReadHandler} current in use, if any.
*/
public ReadHandler getReadHandler() {
return handler;
}
// -------------------------------------------------- Reader-Related Methods
See Also: - read.read(CharBuffer)
/**
* @see java.io.Reader#read(java.nio.CharBuffer)
*/
public int read(final CharBuffer target) throws IOException {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s read(CharBuffer). Ready content: %s",
this, inputContentBuffer);
}
if (closed) {
throw new IOException("Already closed");
}
if (!processingChars) {
throw new IllegalStateException();
}
if (target == null) {
throw new IllegalArgumentException("target cannot be null.");
}
final int read = fillChars(1, target);
checkMarkAfterRead(read);
return read;
}
See Also: - read.read()
/**
* @see java.io.Reader#read()
*/
public int readChar() throws IOException {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s readChar. Ready content: %s",
this, inputContentBuffer);
}
if (closed) {
throw new IOException("Already closed");
}
if (!processingChars) {
throw new IllegalStateException();
}
if (!singleCharBuf.hasRemaining()) {
singleCharBuf.clear();
int read = read(singleCharBuf);
if (read == -1) {
return -1;
}
}
return singleCharBuf.get();
}
See Also: - read.read(char[], int, int)
/**
* @see java.io.Reader#read(char[], int, int)
*/
public int read(final char cbuf[], final int off, final int len)
throws IOException {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s read char array, len: %s. Ready content: %s",
this, len, inputContentBuffer);
}
if (closed) {
throw new IOException("Already closed");
}
if (!processingChars) {
throw new IllegalStateException();
}
if (len == 0) {
return 0;
}
final CharBuffer buf = CharBuffer.wrap(cbuf, off, len);
return read(buf);
}
See Also: - ready.ready()
/**
* @see java.io.Reader#ready()
*/
public boolean ready() {
if (closed) {
return false;
}
if (!processingChars) {
throw new IllegalStateException();
}
return (inputContentBuffer.hasRemaining()
|| httpHeader.isExpectContent());
}
/**
Fill the buffer (blocking) up to the requested length.
Params: - length – how much content should attempt to buffer,
-1
means buffer entire request.
Throws: - IOException – if an error occurs reading data from the wire.
/**
/**
* Fill the buffer (blocking) up to the requested length.
*
* @param length how much content should attempt to buffer,
* <code>-1</code> means buffer entire request.
*
* @throws IOException if an error occurs reading data from the wire.
*/
public void fillFully(final int length) throws IOException {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s fillFully, len: %s. Ready content: %s",
this, length, inputContentBuffer);
}
if (length == 0) return;
if (length > 0) {
final int remaining = length - inputContentBuffer.remaining();
if (remaining > 0) {
fill(remaining);
}
} else {
fill(-1);
}
}
public int availableChar() {
if (!singleCharBuf.hasRemaining()) {
// fill the singleCharBuf to make sure we have at least one char
singleCharBuf.clear();
if (fillAvailableChars(1, singleCharBuf) == 0) {
singleCharBuf.position(singleCharBuf.limit());
return 0;
}
singleCharBuf.flip();
}
// we have 1 char pre-decoded + estimation for the rest byte[]->char[] count.
return 1 + ((int) (inputContentBuffer.remaining() * averageCharsPerByte));
}
// ---------------------------------------------------- Common Input Methods
Supported with binary and character data.
See Also: - mark.mark(int)
- Reader.mark(int)
/**
* <p>
* Supported with binary and character data.
* </p>
*
* @see java.io.InputStream#mark(int)
* @see java.io.Reader#mark(int)
*/
public void mark(final int readAheadLimit) {
if (readAheadLimit > 0) {
markPos = inputContentBuffer.position();
readCount = 0;
this.readAheadLimit = readAheadLimit;
}
}
Only supported with binary data.
See Also: - markSupported.markSupported()
/**
* <p>
* Only supported with binary data.
* </p>
*
* @see java.io.InputStream#markSupported()
*/
public boolean markSupported() {
if (processingChars) {
throw new IllegalStateException();
}
return true;
}
Only supported with binary data.
See Also: - reset.reset()
/**
* <p>
* Only supported with binary data.
* </p>
*
* @see java.io.InputStream#reset()
*/
public void reset() throws IOException {
if (closed) {
throw new IOException("Already closed");
}
if (readAheadLimit == -1) {
throw new IOException("Mark not set");
}
readCount = 0;
inputContentBuffer.position(markPos);
}
See Also: - close.close()
/**
* @see java.io.Closeable#close()
*/
public void close() throws IOException {
closed = true;
}
Skips the specified number of bytes/characters.
See Also: - skip.skip(long)
- Reader.skip(long)
Deprecated: pls. use skip(long)
, the block parameter will be ignored
/**
* Skips the specified number of bytes/characters.
*
* @see java.io.InputStream#skip(long)
* @see java.io.Reader#skip(long)
*
* @deprecated pls. use {@link #skip(long)}, the <tt>block</tt> parameter will be ignored
*/
public long skip(final long n, @SuppressWarnings("UnusedParameters") final boolean block) throws IOException {
return skip(n);
}
Skips the specified number of bytes/characters.
See Also: - skip.skip(long)
- Reader.skip(long)
/**
* Skips the specified number of bytes/characters.
*
* @see java.io.InputStream#skip(long)
* @see java.io.Reader#skip(long)
*/
public long skip(final long n) throws IOException {
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s skip %s bytes. Ready content: %s",
this, n, inputContentBuffer);
}
if (!processingChars) {
if (n <= 0) {
return 0L;
}
if (!inputContentBuffer.hasRemaining()) {
if (fill((int) n) == -1) {
return -1;
}
}
if (inputContentBuffer.remaining() < n) {
fill((int) n);
}
long nlen = Math.min(inputContentBuffer.remaining(), n);
inputContentBuffer.position(inputContentBuffer.position() + (int) nlen);
if (!checkMarkAfterRead(n)) {
inputContentBuffer.shrink();
}
return nlen;
} else {
if (n < 0) { // required by java.io.Reader.skip()
throw new IllegalArgumentException();
}
if (n == 0) {
return 0L;
}
final CharBuffer skipBuffer = CharBuffer.allocate((int) n);
if (fillChars((int) n, skipBuffer) == -1) {
return 0;
}
return Math.min(skipBuffer.remaining(), n);
}
}
public Map<String, String> getTrailers() {
return trailers;
}
public boolean areTrailersAvailable() {
return trailers != null;
}
When invoked, this method will call ReadHandler.onAllDataRead()
on the current ReadHandler
(if any). This method shouldn't be invoked by developers directly. /**
* When invoked, this method will call {@link org.glassfish.grizzly.ReadHandler#onAllDataRead()}
* on the current {@link ReadHandler} (if any).
*
* This method shouldn't be invoked by developers directly.
*/
protected void finished() {
if (!contentRead) {
contentRead = true;
final ReadHandler localHandler = handler;
processTrailers();
if (localHandler != null) {
handler = null;
invokeHandlerAllRead(localHandler, getThreadPool());
}
}
}
private void finishedInTheCurrentThread(final ReadHandler readHandler) {
if (!contentRead) {
contentRead = true;
processTrailers();
if (readHandler != null) {
invokeHandlerAllRead(readHandler, null);
}
}
}
private void invokeHandlerAllRead(final ReadHandler readHandler,
final Executor executor) {
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
readHandler.onAllDataRead();
} catch (Throwable t) {
readHandler.onError(t);
}
}
});
} else {
try {
readHandler.onAllDataRead();
} catch (Throwable t) {
readHandler.onError(t);
}
}
}
private void processTrailers() {
if (trailers == null) {
final MimeHeaders headers = httpHeader.getHeaders();
final int trailerSize = headers.trailerSize();
if (trailerSize > 0) {
trailers = new HashMap<>(trailerSize);
for (String name : headers.trailerNames()) {
trailers.put(name.toLowerCase(), headers.getHeader(name));
}
} else {
trailers = Collections.emptyMap();
}
}
}
public void replayPayload(final Buffer buffer) {
if (!isFinished()) {
throw new IllegalStateException("Can't replay when InputBuffer is not closed");
}
if (LOGGER.isLoggable(LOGGER_LEVEL)) {
log("InputBuffer %s replayPayload to %s", this, buffer);
}
closed = false;
readCount = 0;
readAheadLimit = -1;
markPos = -1;
inputContentBuffer = buffer;
}
Returns: true
if all request data has been read, otherwise
returns false
.
/**
* @return <code>true</code> if all request data has been read, otherwise
* returns <code>false</code>.
*/
public boolean isFinished() {
return contentRead;
}
Returns: true
if this InputBuffer is closed, otherwise
returns false
.
/**
* @return <code>true</code> if this <tt>InputBuffer</tt> is closed, otherwise
* returns <code>false</code>.
*/
public boolean isClosed() {
return closed;
}
Installs a ReadHandler
that will be notified when any data becomes available to read without blocking. Params: - handler – the
ReadHandler
to invoke.
Throws: - IllegalArgumentException – if
handler
is null
. - IllegalStateException – if an attempt is made to register a handler
before an existing registered handler has been invoked or if all request
data has already been read.
/**
* Installs a {@link ReadHandler} that will be notified when any data
* becomes available to read without blocking.
*
* @param handler the {@link ReadHandler} to invoke.
*
* @throws IllegalArgumentException if <code>handler</code> is <code>null</code>.
* @throws IllegalStateException if an attempt is made to register a handler
* before an existing registered handler has been invoked or if all request
* data has already been read.
*/
public void notifyAvailable(final ReadHandler handler) {
notifyAvailable(handler, 1);
}
Installs a ReadHandler
that will be notified when the specified amount of data is available to be read without blocking. Params: - handler – the
ReadHandler
to invoke. - size – the minimum number of bytes that must be available before the
ReadHandler
is notified.
Throws: - IllegalArgumentException – if
handler
is null
,
or if size
is less or equal to zero. - IllegalStateException – if an attempt is made to register a handler
before an existing registered handler has been invoked.
/**
* Installs a {@link ReadHandler} that will be notified when the specified
* amount of data is available to be read without blocking.
*
* @param handler the {@link ReadHandler} to invoke.
* @param size the minimum number of bytes that must be available before
* the {@link ReadHandler} is notified.
*
* @throws IllegalArgumentException if <code>handler</code> is <code>null</code>,
* or if <code>size</code> is less or equal to zero.
* @throws IllegalStateException if an attempt is made to register a handler
* before an existing registered handler has been invoked.
*/
public void notifyAvailable(final ReadHandler handler, final int size) {
if (handler == null) {
throw new IllegalArgumentException("handler cannot be null.");
}
if (size <= 0) {
throw new IllegalArgumentException("size should be positive integer");
}
if (this.handler != null) {
throw new IllegalStateException("Illegal attempt to register a new handler before the existing handler has been notified");
}
// If we don't expect more data - call onAllDataRead() directly
if (closed || isFinished()) {
try {
handler.onAllDataRead();
} catch (Throwable ioe) {
handler.onError(ioe);
}
return;
}
final int available = readyData();
if (shouldNotifyNow(size, available)) {
try {
handler.onDataAvailable();
} catch (Throwable ioe) {
handler.onError(ioe);
}
return;
}
requestedSize = size;
this.handler = handler;
if (!isWaitingDataAsynchronously) {
isWaitingDataAsynchronously = true;
initiateAsyncronousDataReceiving();
}
}
Params: - httpContent – the
HttpContent
to append
Throws: - IOException – if an error occurs appending the
Buffer
Returns: true
if ReadHandler
callback was invoked, otherwise returns false
.
/**
* Appends the specified {@link Buffer} to the internal composite
* {@link Buffer}.
*
* @param httpContent the {@link HttpContent} to append
*
* @return <code>true</code> if {@link ReadHandler}
* callback was invoked, otherwise returns <code>false</code>.
*
* @throws IOException if an error occurs appending the {@link Buffer}
*/
public boolean append(final HttpContent httpContent) throws IOException {
// Stop waiting for data asynchronously and enable it again
// only if we have a handler registered, which requirement
// (expected size) is not met.
isWaitingDataAsynchronously = false;
// check if it's broken HTTP content message or not
if (!HttpContent.isBroken(httpContent)) {
final Buffer buffer = httpContent.getContent();
if (closed) {
buffer.dispose();
return false;
}
final ReadHandler localHandler = handler;
final boolean isLast = httpContent.isLast();
// if we have a handler registered - switch the flag to true
boolean askForMoreDataInThisThread = !isLast && localHandler != null;
boolean invokeDataAvailable = false;
if (buffer.hasRemaining()) {
updateInputContentBuffer(buffer);
if (localHandler != null) {
final int available = readyData();
if (available >= requestedSize) {
invokeDataAvailable = true;
askForMoreDataInThisThread = false;
}
}
}
if (askForMoreDataInThisThread) {
// There is a ReadHandler registered, but it requested more
// data to be available before we can notify it - so wait for
// more data to come
isWaitingDataAsynchronously = true;
return true;
}
handler = null;
if (isLast) {
checkHttpTrailer(httpContent);
}
invokeHandlerOnProperThread(localHandler,
invokeDataAvailable, isLast);
} else { // broken content
final ReadHandler localHandler = handler;
handler = null;
invokeErrorHandlerOnProperThread(localHandler,
((HttpBrokenContent) httpContent).getException());
}
return false;
}
Returns: if this buffer is being used to process asynchronous data. Deprecated: will always return true
/**
* @return if this buffer is being used to process asynchronous data.
* @deprecated will always return true
*/
public boolean isAsyncEnabled() {
// return asyncEnabled;
return true;
}
Sets the asynchronous processing state of this InputBuffer
.
Params: - asyncEnabled –
true
if this InputBuffer
is to be used to process asynchronous request data.
Deprecated: InputBuffer always supports async mode
/**
* Sets the asynchronous processing state of this <code>InputBuffer</code>.
*
* @param asyncEnabled <code>true</code> if this <code>InputBuffer<code>
* is to be used to process asynchronous request data.
* @deprecated <tt>InputBuffer</tt> always supports async mode
*/
@SuppressWarnings("UnusedDeclaration")
public void setAsyncEnabled(boolean asyncEnabled) {
// this.asyncEnabled = asyncEnabled;
}
Invoke ReadHandler.onError(Throwable)
(assuming a ReadHandler
is available) } passing a {#link CancellationException} if the current Connection
is open, or a {#link EOFException} if the connection was unexpectedly closed.
Since: 2.0.1
/**
* <p>
* Invoke {@link ReadHandler#onError(Throwable)} (assuming a {@link ReadHandler}
* is available) } passing a {#link CancellationException}
* if the current {@link Connection} is open, or a {#link EOFException} if
* the connection was unexpectedly closed.
* </p>
*
* @since 2.0.1
*/
public void terminate() {
final ReadHandler localHandler = handler;
if (localHandler != null) {
handler = null;
// call in the current thread, because otherwise handler executed
// in the different thread may deal with recycled Request/Response objects
localHandler.onError(
connection.isOpen()
? new CancellationException()
: new EOFException());
}
}
Initiates asynchronous data receiving.
This is service method, usually users don't have to call it explicitly.
/**
* Initiates asynchronous data receiving.
*
* This is service method, usually users don't have to call it explicitly.
*/
public void initiateAsyncronousDataReceiving() {
// fork the FilterChainContext execution
// keep the current FilterChainContext suspended, but make a copy and resume it
ctx.fork(ctx.getStopAction());
}
// --------------------------------------------------------- Private Methods
Returns: Executor
, which will be used for notifying user registered ReadHandler
.
/**
* @return {@link Executor}, which will be used for notifying user
* registered {@link ReadHandler}.
*/
protected Executor getThreadPool() {
if (!Threads.isService()) {
return null;
}
final ExecutorService es = connection.getTransport().getWorkerThreadPool();
return es != null && !es.isShutdown() ? es : null;
}
private void invokeErrorHandlerOnProperThread(final ReadHandler localHandler,
final Throwable error) {
if (!closed && localHandler != null) {
final Executor executor = getThreadPool();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
localHandler.onError(error);
}
});
} else {
localHandler.onError(error);
}
}
}
private void invokeHandlerOnProperThread(final ReadHandler localHandler,
final boolean invokeDataAvailable,
final boolean isLast)
throws IOException {
final Executor executor = getThreadPool();
if (executor != null) {
executor.execute(new Runnable() {
@Override
public void run() {
invokeHandler(localHandler, invokeDataAvailable, isLast);
}
});
} else {
invokeHandler(localHandler, invokeDataAvailable, isLast);
}
}
private void invokeHandler(final ReadHandler localHandler,
final boolean invokeDataAvailable,
final boolean isLast) {
try {
if (invokeDataAvailable) {
localHandler.onDataAvailable();
}
if (isLast) {
finishedInTheCurrentThread(localHandler);
}
} catch (Throwable t) {
localHandler.onError(t);
}
}
Read next chunk of data in this thread, block if needed.
Throws: - IOException – if an error occurs reading data from the wire.
Returns: HttpContent
/**
* Read next chunk of data in this thread, block if needed.
*
* @return {@link HttpContent}
* @throws IOException if an error occurs reading data from the wire.
*/
protected HttpContent blockingRead() throws IOException {
final ReadResult rr = ctx.read();
final HttpContent c = (HttpContent) rr.getMessage();
rr.recycle();
return c;
}
Used to add additional HTTP message chunk content to inputContentBuffer
.
Params: - requestedLen – how much content should attempt to be read,
-1
means read till the end of the message.
Throws: - IOException – if an I/O error occurs while reading content
Returns: the number of bytes actually read
/**
* <p>
* Used to add additional HTTP message chunk content to {@link #inputContentBuffer}.
* </p>
*
* @param requestedLen how much content should attempt to be read,
* <code>-1</code> means read till the end of the message.
*
* @return the number of bytes actually read
*
* @throws IOException if an I/O error occurs while reading content
*/
private int fill(final int requestedLen) throws IOException {
int read = 0;
while ((requestedLen == -1 || read < requestedLen) &&
httpHeader.isExpectContent()) {
final HttpContent c = blockingRead();
final boolean isLast = c.isLast();
// Check if HttpContent is chunked message trailer w/ headers
checkHttpTrailer(c);
final Buffer b;
try {
b = c.getContent();
} catch (HttpBrokenContentException e) {
final Throwable cause = e.getCause();
throw Exceptions.makeIOException(cause != null ? cause : e);
}
read += b.remaining();
updateInputContentBuffer(b);
c.recycle();
if (isLast) {
finished();
break;
}
}
if (read > 0 || requestedLen == 0) {
return read;
}
return -1;
}
Used to convert bytes to chars.
Params: - requestedLen – how much content should attempt to be read
Throws: - IOException – if an I/O error occurs while reading content
Returns: the number of chars actually read
/**
* <p>
* Used to convert bytes to chars.
* </p>
*
* @param requestedLen how much content should attempt to be read
*
* @return the number of chars actually read
*
* @throws IOException if an I/O error occurs while reading content
*/
private int fillChars(final int requestedLen,
final CharBuffer dst) throws IOException {
int read = 0;
// 1) Check pre-decoded singleCharBuf
if (dst != singleCharBuf && singleCharBuf.hasRemaining()) {
dst.put(singleCharBuf.get());
read = 1;
}
// 2) Decode available byte[] -> char[]
if (inputContentBuffer.hasRemaining()) {
read += fillAvailableChars(requestedLen - read, dst);
}
if (read >= requestedLen) {
dst.flip();
return read;
}
// 3) If we don't expect more data - return what we've read so far
if (!httpHeader.isExpectContent()) {
dst.flip();
return read > 0 ? read : -1;
}
// 4) Try to read more data (we may block)
CharsetDecoder decoderLocal = getDecoder();
boolean isNeedMoreInput = false; // true, if content in composite buffer is not enough to produce even 1 char
boolean last = false;
while (read < requestedLen && httpHeader.isExpectContent()) {
if (isNeedMoreInput || !inputContentBuffer.hasRemaining()) {
final HttpContent c = blockingRead();
updateInputContentBuffer(c.getContent());
last = c.isLast();
c.recycle();
isNeedMoreInput = false;
}
final ByteBuffer bytes = inputContentBuffer.toByteBuffer();
final int bytesPos = bytes.position();
final int dstPos = dst.position();
final CoderResult result = decoderLocal.decode(bytes, dst, false);
final int producedChars = dst.position() - dstPos;
final int consumedBytes = bytes.position() - bytesPos;
read += producedChars;
if (consumedBytes > 0) {
bytes.position(bytesPos);
inputContentBuffer.position(inputContentBuffer.position() + consumedBytes);
if (readAheadLimit == -1) {
inputContentBuffer.shrink();
}
} else {
isNeedMoreInput = true;
}
if (last || result == CoderResult.OVERFLOW) {
break;
}
}
dst.flip();
if (last && read == 0) {
read = -1;
}
return read;
}
Used to convert pre-read (buffered) bytes to chars.
Params: - requestedLen – how much content should attempt to be read
Returns: the number of chars actually read
/**
* <p>
* Used to convert pre-read (buffered) bytes to chars.
* </p>
*
* @param requestedLen how much content should attempt to be read
*
* @return the number of chars actually read
*/
private int fillAvailableChars(final int requestedLen, final CharBuffer dst) {
final CharsetDecoder decoderLocal = getDecoder();
final ByteBuffer bb = inputContentBuffer.toByteBuffer();
final int oldBBPos = bb.position();
int producedChars = 0;
int consumedBytes = 0;
int producedCharsNow;
int consumedBytesNow;
CoderResult result;
int remaining = requestedLen;
do {
final int charPos = dst.position();
final int bbPos = bb.position();
result = decoderLocal.decode(bb, dst, false);
producedCharsNow = dst.position() - charPos;
consumedBytesNow = bb.position() - bbPos;
producedChars += producedCharsNow;
consumedBytes += consumedBytesNow;
remaining -= producedCharsNow;
} while (remaining > 0 &&
(producedCharsNow > 0 || consumedBytesNow > 0) &&
bb.hasRemaining() &&
result == CoderResult.UNDERFLOW);
bb.position(oldBBPos);
inputContentBuffer.position(inputContentBuffer.position() + consumedBytes);
if (readAheadLimit == -1) {
inputContentBuffer.shrink();
}
return producedChars;
}
protected void updateInputContentBuffer(final Buffer buffer) {
buffer.allowBufferDispose(true);
if (inputContentBuffer == null) {
inputContentBuffer = buffer;
} else if (inputContentBuffer.hasRemaining()
|| readAheadLimit > 0) { // if the stream is marked - we can't dispose the inputContentBuffer, even if it's been read off
toCompositeInputContentBuffer().append(buffer);
} else {
inputContentBuffer.tryDispose();
inputContentBuffer = buffer;
}
}
Params: - size – the amount of data that must be available for a
ReadHandler
to be notified. - available – the amount of data currently available.
Returns: true
if the handler should be notified during a call to notifyAvailable(ReadHandler)
or notifyAvailable(ReadHandler, int)
, otherwise false
/**
* @param size the amount of data that must be available for a {@link ReadHandler}
* to be notified.
* @param available the amount of data currently available.
*
* @return <code>true</code> if the handler should be notified during a call
* to {@link #notifyAvailable(ReadHandler)} or {@link #notifyAvailable(ReadHandler, int)},
* otherwise <code>false</code>
*/
private static boolean shouldNotifyNow(final int size, final int available) {
return (available != 0 && available >= size);
}
Returns: the CharsetDecoder
that should be used when converting content from binary to character
/**
* @return the {@link CharsetDecoder} that should be used when converting
* content from binary to character
*/
private CharsetDecoder getDecoder() {
if (decoder == null) {
decoder = decoders.get(encoding);
if (decoder == null) {
final Charset cs = Charsets.lookupCharset(encoding);
decoder = cs.newDecoder();
decoder.onMalformedInput(CodingErrorAction.REPLACE);
decoder.onUnmappableCharacter(CodingErrorAction.REPLACE);
decoders.put(encoding, decoder);
} else {
decoder.reset();
}
}
return decoder;
}
private CompositeBuffer toCompositeInputContentBuffer() {
if (!inputContentBuffer.isComposite()) {
final CompositeBuffer compositeBuffer = CompositeBuffer.newBuffer(
connection.getMemoryManager());
compositeBuffer.allowBufferDispose(true);
compositeBuffer.allowInternalBuffersDispose(true);
int posAlign = 0;
if (readAheadLimit > 0) { // the simple inputContentBuffer is marked
// make the marked data still available
inputContentBuffer.position(
inputContentBuffer.position() - readCount);
posAlign = readCount;
markPos = 0; // for the CompositeBuffer markPos is 0
}
compositeBuffer.append(inputContentBuffer);
compositeBuffer.position(posAlign);
inputContentBuffer = compositeBuffer;
}
return (CompositeBuffer) inputContentBuffer;
}
Params: - n – read bytes count
Returns: true if mark is still active, or false if the
mark hasn't been set or has been invalidated
/**
* @param n read bytes count
* @return <tt>true</tt> if mark is still active, or <tt>false</tt> if the
* mark hasn't been set or has been invalidated
*/
private boolean checkMarkAfterRead(final long n) {
if (n > 0 && readAheadLimit != -1) {
if ((long) readCount + n <= readAheadLimit) {
readCount += n;
return true;
}
// invalidate the mark
readAheadLimit = -1;
markPos = -1;
readCount = 0;
}
return false;
}
Check if passed HttpContent
is HttpTrailer
, which represents trailer chunk (when chunked Transfer-Encoding is used), if it is a trailer chunk - then copy all the available trailer headers to request headers map. /**
* Check if passed {@link HttpContent} is {@link HttpTrailer}, which represents
* trailer chunk (when chunked Transfer-Encoding is used), if it is a trailer
* chunk - then copy all the available trailer headers to request headers map.
*/
private static void checkHttpTrailer(final HttpContent httpContent) {
if (HttpTrailer.isTrailer(httpContent)) {
final HttpTrailer httpTrailer = (HttpTrailer) httpContent;
final HttpHeader httpHeader = httpContent.getHttpHeader();
httpHeader.getHeaders().mark();
final MimeHeaders trailerHeaders = httpTrailer.getHeaders();
final int size = trailerHeaders.size();
for (int i = 0; i < size; i++) {
httpHeader.addHeader(
trailerHeaders.getName(i).toString(),
trailerHeaders.getValue(i).toString());
}
}
}
private static void log(final String message, Object... params) {
final String preparedMsg = String.format(message, params);
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.log(Level.FINEST, preparedMsg, new Exception("Logged at"));
} else {
LOGGER.log(LOGGER_LEVEL, preparedMsg);
}
}
}