/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.avro.io;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.InvalidNumberEncodingException;
import org.apache.avro.util.Utf8;

An Decoder for binary-format data.

Instances are created using DecoderFactory.

This class may read-ahead and buffer bytes from the source beyond what is required to serve its read methods. The number of unused bytes in the buffer can be accessed by inputStream().remaining(), if the BinaryDecoder is not 'direct'.
See Also:
/** * An {@link Decoder} for binary-format data. * <p/> * Instances are created using {@link DecoderFactory}. * <p/> * This class may read-ahead and buffer bytes from the source beyond what is * required to serve its read methods. The number of unused bytes in the buffer * can be accessed by inputStream().remaining(), if the BinaryDecoder is not * 'direct'. * * @see Encoder */
public class BinaryDecoder extends Decoder {
The maximum size of array to allocate. Some VMs reserve some header words in an array. Attempts to allocate larger arrays may result in OutOfMemoryError: Requested array size exceeds VM limit
/** * The maximum size of array to allocate. Some VMs reserve some header words in * an array. Attempts to allocate larger arrays may result in OutOfMemoryError: * Requested array size exceeds VM limit */
private static final long MAX_ARRAY_SIZE = (long) Integer.MAX_VALUE - 8L; private ByteSource source = null; // we keep the buffer and its state variables in this class and not in a // container class for performance reasons. This improves performance // over a container object by about 5% to 15% // for example, we could have a FastBuffer class with these state variables // and keep a private FastBuffer member here. This simplifies the // "detach source" code and source access to the buffer, but // hurts performance. private byte[] buf = null; private int minPos = 0; private int pos = 0; private int limit = 0; byte[] getBuf() { return buf; } int getPos() { return pos; } int getLimit() { return limit; } void setBuf(byte[] buf, int pos, int len) { this.buf = buf; this.pos = pos; this.limit = pos + len; } void clearBuf() { this.buf = null; }
protected constructor for child classes
/** protected constructor for child classes */
protected BinaryDecoder() { } BinaryDecoder(InputStream in, int bufferSize) { super(); configure(in, bufferSize); } BinaryDecoder(byte[] data, int offset, int length) { super(); configure(data, offset, length); } BinaryDecoder configure(InputStream in, int bufferSize) { configureSource(bufferSize, new InputStreamByteSource(in)); return this; } BinaryDecoder configure(byte[] data, int offset, int length) { configureSource(DecoderFactory.DEFAULT_BUFFER_SIZE, new ByteArrayByteSource(data, offset, length)); return this; }
Initializes this decoder with a new ByteSource. Detaches the old source (if it exists) from this Decoder. The old source's state no longer depends on this Decoder and its InputStream interface will continue to drain the remaining buffer and source data.

The decoder will read from the new source. The source will generally replace the buffer with its own. If the source allocates a new buffer, it will create it with size bufferSize.
/** * Initializes this decoder with a new ByteSource. Detaches the old source (if * it exists) from this Decoder. The old source's state no longer depends on * this Decoder and its InputStream interface will continue to drain the * remaining buffer and source data. * <p/> * The decoder will read from the new source. The source will generally replace * the buffer with its own. If the source allocates a new buffer, it will create * it with size bufferSize. */
private void configureSource(int bufferSize, ByteSource source) { if (null != this.source) { this.source.detach(); } source.attach(bufferSize, this); this.source = source; } @Override public void readNull() throws IOException { } @Override public boolean readBoolean() throws IOException { // inlined, shorter version of ensureBounds if (limit == pos) { limit = source.tryReadRaw(buf, 0, buf.length); pos = 0; if (limit == 0) { throw new EOFException(); } } int n = buf[pos++] & 0xff; return n == 1; } @Override public int readInt() throws IOException { ensureBounds(5); // won't throw index out of bounds int len = 1; int b = buf[pos] & 0xff; int n = b & 0x7f; if (b > 0x7f) { b = buf[pos + len++] & 0xff; n ^= (b & 0x7f) << 7; if (b > 0x7f) { b = buf[pos + len++] & 0xff; n ^= (b & 0x7f) << 14; if (b > 0x7f) { b = buf[pos + len++] & 0xff; n ^= (b & 0x7f) << 21; if (b > 0x7f) { b = buf[pos + len++] & 0xff; n ^= (b & 0x7f) << 28; if (b > 0x7f) { throw new InvalidNumberEncodingException("Invalid int encoding"); } } } } } pos += len; if (pos > limit) { throw new EOFException(); } return (n >>> 1) ^ -(n & 1); // back to two's-complement } @Override public long readLong() throws IOException { ensureBounds(10); int b = buf[pos++] & 0xff; int n = b & 0x7f; long l; if (b > 0x7f) { b = buf[pos++] & 0xff; n ^= (b & 0x7f) << 7; if (b > 0x7f) { b = buf[pos++] & 0xff; n ^= (b & 0x7f) << 14; if (b > 0x7f) { b = buf[pos++] & 0xff; n ^= (b & 0x7f) << 21; if (b > 0x7f) { // only the low 28 bits can be set, so this won't carry // the sign bit to the long l = innerLongDecode((long) n); } else { l = n; } } else { l = n; } } else { l = n; } } else { l = n; } if (pos > limit) { throw new EOFException(); } return (l >>> 1) ^ -(l & 1); // back to two's-complement } // splitting readLong up makes it faster because of the JVM does more // optimizations on small methods private long innerLongDecode(long l) throws IOException { int len = 1; int b = buf[pos] & 0xff; l ^= (b & 0x7fL) << 28; if (b > 0x7f) { b = buf[pos + len++] & 0xff; l ^= (b & 0x7fL) << 35; if (b > 0x7f) { b = buf[pos + len++] & 0xff; l ^= (b & 0x7fL) << 42; if (b > 0x7f) { b = buf[pos + len++] & 0xff; l ^= (b & 0x7fL) << 49; if (b > 0x7f) { b = buf[pos + len++] & 0xff; l ^= (b & 0x7fL) << 56; if (b > 0x7f) { b = buf[pos + len++] & 0xff; l ^= (b & 0x7fL) << 63; if (b > 0x7f) { throw new InvalidNumberEncodingException("Invalid long encoding"); } } } } } } pos += len; return l; } @Override public float readFloat() throws IOException { ensureBounds(4); int len = 1; int n = (buf[pos] & 0xff) | ((buf[pos + len++] & 0xff) << 8) | ((buf[pos + len++] & 0xff) << 16) | ((buf[pos + len++] & 0xff) << 24); if ((pos + 4) > limit) { throw new EOFException(); } pos += 4; return Float.intBitsToFloat(n); } @Override public double readDouble() throws IOException { ensureBounds(8); int len = 1; int n1 = (buf[pos] & 0xff) | ((buf[pos + len++] & 0xff) << 8) | ((buf[pos + len++] & 0xff) << 16) | ((buf[pos + len++] & 0xff) << 24); int n2 = (buf[pos + len++] & 0xff) | ((buf[pos + len++] & 0xff) << 8) | ((buf[pos + len++] & 0xff) << 16) | ((buf[pos + len++] & 0xff) << 24); if ((pos + 8) > limit) { throw new EOFException(); } pos += 8; return Double.longBitsToDouble((((long) n1) & 0xffffffffL) | (((long) n2) << 32)); } @Override public Utf8 readString(Utf8 old) throws IOException { long length = readLong(); if (length > MAX_ARRAY_SIZE) { throw new UnsupportedOperationException("Cannot read strings longer than " + MAX_ARRAY_SIZE + " bytes"); } if (length < 0L) { throw new AvroRuntimeException("Malformed data. Length is negative: " + length); } Utf8 result = (old != null ? old : new Utf8()); result.setByteLength((int) length); if (0L != length) { doReadBytes(result.getBytes(), 0, (int) length); } return result; } private final Utf8 scratchUtf8 = new Utf8(); @Override public String readString() throws IOException { return readString(scratchUtf8).toString(); } @Override public void skipString() throws IOException { doSkipBytes(readLong()); } @Override public ByteBuffer readBytes(ByteBuffer old) throws IOException { int length = readInt(); final ByteBuffer result; if (old != null && length <= old.capacity()) { result = old; ((Buffer) result).clear(); } else { result = ByteBuffer.allocate(length); } doReadBytes(result.array(), result.position(), length); ((Buffer) result).limit(length); return result; } @Override public void skipBytes() throws IOException { doSkipBytes(readLong()); } @Override public void readFixed(byte[] bytes, int start, int length) throws IOException { doReadBytes(bytes, start, length); } @Override public void skipFixed(int length) throws IOException { doSkipBytes(length); } @Override public int readEnum() throws IOException { return readInt(); } protected void doSkipBytes(long length) throws IOException { int remaining = limit - pos; if (length <= remaining) { pos = (int) (pos + length); } else { limit = pos = 0; length -= remaining; source.skipSourceBytes(length); } }
Reads length bytes into bytes starting at start.
Throws:
  • EOFException – If there are not enough number of bytes in the source.
  • IOException
/** * Reads <tt>length</tt> bytes into <tt>bytes</tt> starting at <tt>start</tt>. * * @throws EOFException If there are not enough number of bytes in the source. * @throws IOException */
protected void doReadBytes(byte[] bytes, int start, int length) throws IOException { if (length < 0) throw new AvroRuntimeException("Malformed data. Length is negative: " + length); int remaining = limit - pos; if (length <= remaining) { System.arraycopy(buf, pos, bytes, start, length); pos += length; } else { // read the rest of the buffer System.arraycopy(buf, pos, bytes, start, remaining); start += remaining; length -= remaining; pos = limit; // finish from the byte source source.readRaw(bytes, start, length); } }
Returns the number of items to follow in the current array or map. Returns 0 if there are no more items in the current array and the array/map has ended. Arrays are encoded as a series of blocks. Each block consists of a long count value, followed by that many array items. A block with count zero indicates the end of the array. If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block size indicating the number of bytes in the block.
Throws:
  • IOException – If the first byte cannot be read for any reason other than the end of the file, if the input stream has been closed, or if some other I/O error occurs.
/** * Returns the number of items to follow in the current array or map. Returns 0 * if there are no more items in the current array and the array/map has ended. * Arrays are encoded as a series of blocks. Each block consists of a long count * value, followed by that many array items. A block with count zero indicates * the end of the array. If a block's count is negative, its absolute value is * used, and the count is followed immediately by a long block size indicating * the number of bytes in the block. * * @throws IOException If the first byte cannot be read for any reason other * than the end of the file, if the input stream has been * closed, or if some other I/O error occurs. */
protected long doReadItemCount() throws IOException { long result = readLong(); if (result < 0L) { // Consume byte-count if present readLong(); result = -result; } return result; }
Reads the count of items in the current array or map and skip those items, if possible. If it could skip the items, keep repeating until there are no more items left in the array or map. Arrays are encoded as a series of blocks. Each block consists of a long count value, followed by that many array items. A block with count zero indicates the end of the array. If a block's count is negative, its absolute value is used, and the count is followed immediately by a long block size indicating the number of bytes in the block. If block size is missing, this method return the count of the items found. The client needs to skip the items individually.
Throws:
  • IOException – If the first byte cannot be read for any reason other than the end of the file, if the input stream has been closed, or if some other I/O error occurs.
Returns:Zero if there are no more items to skip and end of array/map is reached. Positive number if some items are found that cannot be skipped and the client needs to skip them individually.
/** * Reads the count of items in the current array or map and skip those items, if * possible. If it could skip the items, keep repeating until there are no more * items left in the array or map. Arrays are encoded as a series of blocks. * Each block consists of a long count value, followed by that many array items. * A block with count zero indicates the end of the array. If a block's count is * negative, its absolute value is used, and the count is followed immediately * by a long block size indicating the number of bytes in the block. If block * size is missing, this method return the count of the items found. The client * needs to skip the items individually. * * @return Zero if there are no more items to skip and end of array/map is * reached. Positive number if some items are found that cannot be * skipped and the client needs to skip them individually. * * @throws IOException If the first byte cannot be read for any reason other * than the end of the file, if the input stream has been * closed, or if some other I/O error occurs. */
private long doSkipItems() throws IOException { long result = readLong(); while (result < 0L) { final long bytecount = readLong(); doSkipBytes(bytecount); result = readLong(); } return result; } @Override public long readArrayStart() throws IOException { return doReadItemCount(); } @Override public long arrayNext() throws IOException { return doReadItemCount(); } @Override public long skipArray() throws IOException { return doSkipItems(); } @Override public long readMapStart() throws IOException { return doReadItemCount(); } @Override public long mapNext() throws IOException { return doReadItemCount(); } @Override public long skipMap() throws IOException { return doSkipItems(); } @Override public int readIndex() throws IOException { return readInt(); }
Returns true if the current BinaryDecoder is at the end of its source data and cannot read any further without throwing an EOFException or other IOException.

Not all implementations of BinaryDecoder support isEnd(). Implementations that do not support isEnd() will throw a UnsupportedOperationException.
Throws:
  • IOException – If the first byte cannot be read for any reason other than the end of the file, if the input stream has been closed, or if some other I/O error occurs.
/** * Returns true if the current BinaryDecoder is at the end of its source data * and cannot read any further without throwing an EOFException or other * IOException. * <p/> * Not all implementations of BinaryDecoder support isEnd(). Implementations * that do not support isEnd() will throw a * {@link java.lang.UnsupportedOperationException}. * * @throws IOException If the first byte cannot be read for any reason other * than the end of the file, if the input stream has been * closed, or if some other I/O error occurs. */
public boolean isEnd() throws IOException { if (pos < limit) { return false; } if (source.isEof()) { return true; } // read from source. final int read = source.tryReadRaw(buf, 0, buf.length); pos = 0; limit = read; return (0 == read); }
Ensures that buf[pos + num - 1] is not out of the buffer array bounds. However, buf[pos + num -1] may be >= limit if there is not enough data left in the source to fill the array with num bytes.

This method allows readers to read ahead by num bytes safely without checking for EOF at each byte. However, readers must ensure that their reads are valid by checking that their read did not advance past the limit before adjusting pos.

num must be less than the buffer size and greater than 0
/** * Ensures that buf[pos + num - 1] is not out of the buffer array bounds. * However, buf[pos + num -1] may be >= limit if there is not enough data left * in the source to fill the array with num bytes. * <p/> * This method allows readers to read ahead by num bytes safely without checking * for EOF at each byte. However, readers must ensure that their reads are valid * by checking that their read did not advance past the limit before adjusting * pos. * <p/> * num must be less than the buffer size and greater than 0 */
private void ensureBounds(int num) throws IOException { int remaining = limit - pos; if (remaining < num) { // move remaining to front source.compactAndFill(buf, pos, minPos, remaining); if (pos >= limit) throw new EOFException(); } }
Returns an InputStream that is aware of any buffering that may occur in this BinaryDecoder. Readers that need to interleave decoding Avro data with other reads must access this InputStream to do so unless the implementation is 'direct' and does not read beyond the minimum bytes necessary from the source.
/** * Returns an {@link java.io.InputStream} that is aware of any buffering that * may occur in this BinaryDecoder. Readers that need to interleave decoding * Avro data with other reads must access this InputStream to do so unless the * implementation is 'direct' and does not read beyond the minimum bytes * necessary from the source. */
public InputStream inputStream() { return source; }
BufferAccessor is used by BinaryEncoder to enable ByteSources and the InputStream returned by inputStream to access the BinaryEncoder's buffer. When a BufferAccessor is created, it is attached to a BinaryDecoder and its buffer. Its accessors directly reference the BinaryDecoder's buffer. When detach() is called, it stores references to the BinaryDecoder's buffer directly. The BinaryDecoder only detaches a BufferAccessor when it is initializing to a new ByteSource. Therefore, a client that is using the InputStream returned by BinaryDecoder.inputStream can continue to use that stream after a BinaryDecoder has been reinitialized to read from new data.
/** * BufferAccessor is used by BinaryEncoder to enable {@link ByteSource}s and the * InputStream returned by {@link BinaryDecoder.inputStream} to access the * BinaryEncoder's buffer. When a BufferAccessor is created, it is attached to a * BinaryDecoder and its buffer. Its accessors directly reference the * BinaryDecoder's buffer. When detach() is called, it stores references to the * BinaryDecoder's buffer directly. The BinaryDecoder only detaches a * BufferAccessor when it is initializing to a new ByteSource. Therefore, a * client that is using the InputStream returned by BinaryDecoder.inputStream * can continue to use that stream after a BinaryDecoder has been reinitialized * to read from new data. */
static class BufferAccessor { private final BinaryDecoder decoder; private byte[] buf; private int pos; private int limit; boolean detached = false; private BufferAccessor(BinaryDecoder decoder) { this.decoder = decoder; } void detach() { this.buf = decoder.buf; this.pos = decoder.pos; this.limit = decoder.limit; detached = true; } int getPos() { if (detached) return this.pos; else return decoder.pos; } int getLim() { if (detached) return this.limit; else return decoder.limit; } byte[] getBuf() { if (detached) return this.buf; else return decoder.buf; } void setPos(int pos) { if (detached) this.pos = pos; else decoder.pos = pos; } void setLimit(int limit) { if (detached) this.limit = limit; else decoder.limit = limit; } void setBuf(byte[] buf, int offset, int length) { if (detached) { this.buf = buf; this.limit = offset + length; this.pos = offset; } else { decoder.buf = buf; decoder.limit = offset + length; decoder.pos = offset; decoder.minPos = offset; } } }
ByteSource abstracts the source of data from the core workings of BinaryDecoder. This is very important for performance reasons because InputStream's API is a barrier to performance due to several quirks: InputStream does not in general require that as many bytes as possible have been read when filling a buffer.

InputStream's terminating conditions for a read are two-fold: EOFException and '-1' on the return from read(). Implementations are supposed to return '-1' on EOF but often do not. The extra terminating conditions cause extra conditionals on both sides of the API, and slow performance significantly.

ByteSource implementations provide read() and skip() variants that have stronger guarantees than InputStream, freeing client code to be simplified and faster.

skipSourceBytes and readRaw are guaranteed to have read or skipped as many bytes as possible, or throw EOFException. trySkipBytes and tryRead are guaranteed to attempt to read or skip as many bytes as possible and never throw EOFException, while returning the exact number of bytes skipped or read. isEof returns true if all the source bytes have been read or skipped. This condition can also be detected by a client if an EOFException is thrown from skipSourceBytes or readRaw, or if trySkipBytes or tryRead return 0;

A ByteSource also implements the InputStream contract for use by APIs that require it. The InputStream interface must take into account buffering in any decoder that this ByteSource is attached to. The other methods do not account for buffering.
/** * ByteSource abstracts the source of data from the core workings of * BinaryDecoder. This is very important for performance reasons because * InputStream's API is a barrier to performance due to several quirks: * InputStream does not in general require that as many bytes as possible have * been read when filling a buffer. * <p/> * InputStream's terminating conditions for a read are two-fold: EOFException * and '-1' on the return from read(). Implementations are supposed to return * '-1' on EOF but often do not. The extra terminating conditions cause extra * conditionals on both sides of the API, and slow performance significantly. * <p/> * ByteSource implementations provide read() and skip() variants that have * stronger guarantees than InputStream, freeing client code to be simplified * and faster. * <p/> * {@link skipSourceBytes} and {@link readRaw} are guaranteed to have read or * skipped as many bytes as possible, or throw EOFException. * {@link trySkipBytes} and {@link tryRead} are guaranteed to attempt to read or * skip as many bytes as possible and never throw EOFException, while returning * the exact number of bytes skipped or read. {@link isEof} returns true if all * the source bytes have been read or skipped. This condition can also be * detected by a client if an EOFException is thrown from * {@link skipSourceBytes} or {@link readRaw}, or if {@link trySkipBytes} or * {@link tryRead} return 0; * <p/> * A ByteSource also implements the InputStream contract for use by APIs that * require it. The InputStream interface must take into account buffering in any * decoder that this ByteSource is attached to. The other methods do not account * for buffering. */
abstract static class ByteSource extends InputStream { // maintain a reference to the buffer, so that if this // source is detached from the Decoder, and a client still // has a reference to it via inputStream(), bytes are not // lost protected BufferAccessor ba; protected ByteSource() { } abstract boolean isEof(); protected void attach(int bufferSize, BinaryDecoder decoder) { decoder.buf = new byte[bufferSize]; decoder.pos = 0; decoder.minPos = 0; decoder.limit = 0; this.ba = new BufferAccessor(decoder); } protected void detach() { ba.detach(); }
Skips length bytes from the source. If length bytes cannot be skipped due to end of file/stream/channel/etc an EOFException is thrown
Params:
  • length – the number of bytes to attempt to skip
Throws:
/** * Skips length bytes from the source. If length bytes cannot be skipped due to * end of file/stream/channel/etc an EOFException is thrown * * @param length the number of bytes to attempt to skip * @throws IOException if an error occurs * @throws EOFException if length bytes cannot be skipped */
protected abstract void skipSourceBytes(long length) throws IOException;
Attempts to skip skipLength bytes from the source. Returns the actual number of bytes skipped. This method must attempt to skip as many bytes as possible up to skipLength bytes. Skipping 0 bytes signals end of stream/channel/file/etc
Params:
  • skipLength – the number of bytes to attempt to skip
Returns:the count of actual bytes skipped.
/** * Attempts to skip <i>skipLength</i> bytes from the source. Returns the actual * number of bytes skipped. This method must attempt to skip as many bytes as * possible up to <i>skipLength</i> bytes. Skipping 0 bytes signals end of * stream/channel/file/etc * * @param skipLength the number of bytes to attempt to skip * @return the count of actual bytes skipped. */
protected abstract long trySkipBytes(long skipLength) throws IOException;
Reads raw from the source, into a byte[]. Used for reads that are larger than the buffer, or otherwise unbuffered. This is a mandatory read -- if there is not enough bytes in the source, EOFException is thrown.
Throws:
  • IOException – if an error occurs
  • EOFException – if len bytes cannot be read
/** * Reads raw from the source, into a byte[]. Used for reads that are larger than * the buffer, or otherwise unbuffered. This is a mandatory read -- if there is * not enough bytes in the source, EOFException is thrown. * * @throws IOException if an error occurs * @throws EOFException if len bytes cannot be read */
protected abstract void readRaw(byte[] data, int off, int len) throws IOException;
Attempts to copy up to len bytes from the source into data, starting at index off. Returns the actual number of bytes copied which may be between 0 and len.

This method must attempt to read as much as possible from the source. Returns 0 when at the end of stream/channel/file/etc.
Throws:
  • IOException – if an error occurs reading
/** * Attempts to copy up to <i>len</i> bytes from the source into data, starting * at index <i>off</i>. Returns the actual number of bytes copied which may be * between 0 and <i>len</i>. * <p/> * This method must attempt to read as much as possible from the source. Returns * 0 when at the end of stream/channel/file/etc. * * @throws IOException if an error occurs reading **/
protected abstract int tryReadRaw(byte[] data, int off, int len) throws IOException;
If this source buffers, compacts the buffer by placing the remaining bytes starting at pos at minPos. This may be done in the current buffer, or may replace the buffer with a new one. The end result must be a buffer with at least 16 bytes of remaining space.
Params:
  • pos –
  • minPos –
  • remaining –
Throws:
/** * If this source buffers, compacts the buffer by placing the <i>remaining</i> * bytes starting at <i>pos</i> at <i>minPos</i>. This may be done in the * current buffer, or may replace the buffer with a new one. * * The end result must be a buffer with at least 16 bytes of remaining space. * * @param pos * @param minPos * @param remaining * @throws IOException */
protected void compactAndFill(byte[] buf, int pos, int minPos, int remaining) throws IOException { System.arraycopy(buf, pos, buf, minPos, remaining); ba.setPos(minPos); int newLimit = remaining + tryReadRaw(buf, minPos + remaining, buf.length - remaining); ba.setLimit(newLimit); } @Override public int read(byte[] b, int off, int len) throws IOException { int lim = ba.getLim(); int pos = ba.getPos(); byte[] buf = ba.getBuf(); int remaining = (lim - pos); if (remaining >= len) { System.arraycopy(buf, pos, b, off, len); pos = pos + len; ba.setPos(pos); return len; } else { // flush buffer to array System.arraycopy(buf, pos, b, off, remaining); pos = pos + remaining; ba.setPos(pos); // get the rest from the stream (skip array) int inputRead = remaining + tryReadRaw(b, off + remaining, len - remaining); if (inputRead == 0) { return -1; } else { return inputRead; } } } @Override public long skip(long n) throws IOException { int lim = ba.getLim(); int pos = ba.getPos(); int remaining = lim - pos; if (remaining > n) { pos = (int) (pos + n); ba.setPos(pos); return n; } else { pos = lim; ba.setPos(pos); long isSkipCount = trySkipBytes(n - remaining); return isSkipCount + remaining; } }
returns the number of bytes remaining that this BinaryDecoder has buffered from its source
/** * returns the number of bytes remaining that this BinaryDecoder has buffered * from its source */
@Override public int available() throws IOException { return (ba.getLim() - ba.getPos()); } } private static class InputStreamByteSource extends ByteSource { private InputStream in; protected boolean isEof = false; private InputStreamByteSource(InputStream in) { super(); this.in = in; } @Override protected void skipSourceBytes(long length) throws IOException { boolean readZero = false; while (length > 0) { long n = in.skip(length); if (n > 0) { length -= n; continue; } // The inputStream contract is evil. // zero "might" mean EOF. So check for 2 in a row, we will // infinite loop waiting for -1 with some classes others // spuriously will return 0 on occasion without EOF if (n == 0) { if (readZero) { isEof = true; throw new EOFException(); } readZero = true; continue; } // read negative isEof = true; throw new EOFException(); } } @Override protected long trySkipBytes(long length) throws IOException { long leftToSkip = length; try { boolean readZero = false; while (leftToSkip > 0) { long n = in.skip(length); if (n > 0) { leftToSkip -= n; continue; } // The inputStream contract is evil. // zero "might" mean EOF. So check for 2 in a row, we will // infinite loop waiting for -1 with some classes others // spuriously will return 0 on occasion without EOF if (n == 0) { if (readZero) { isEof = true; break; } readZero = true; continue; } // read negative isEof = true; break; } } catch (EOFException eof) { isEof = true; } return length - leftToSkip; } @Override protected void readRaw(byte[] data, int off, int len) throws IOException { while (len > 0) { int read = in.read(data, off, len); if (read < 0) { isEof = true; throw new EOFException(); } len -= read; off += read; } } @Override protected int tryReadRaw(byte[] data, int off, int len) throws IOException { int leftToCopy = len; try { while (leftToCopy > 0) { int read = in.read(data, off, leftToCopy); if (read < 0) { isEof = true; break; } leftToCopy -= read; off += read; } } catch (EOFException eof) { isEof = true; } return len - leftToCopy; } @Override public int read() throws IOException { if (ba.getLim() - ba.getPos() == 0) { return in.read(); } else { int position = ba.getPos(); int result = ba.getBuf()[position] & 0xff; ba.setPos(position + 1); return result; } } @Override public boolean isEof() { return isEof; } @Override public void close() throws IOException { in.close(); } }
This byte source is special. It will avoid copying data by using the source's byte[] as a buffer in the decoder.
/** * This byte source is special. It will avoid copying data by using the source's * byte[] as a buffer in the decoder. * */
private static class ByteArrayByteSource extends ByteSource { private static final int MIN_SIZE = 16; private byte[] data; private int position; private int max; private boolean compacted = false; private ByteArrayByteSource(byte[] data, int start, int len) { super(); // make sure data is not too small, otherwise getLong may try and // read 10 bytes and get index out of bounds. if (len < MIN_SIZE) { this.data = Arrays.copyOfRange(data, start, start + MIN_SIZE); this.position = 0; this.max = len; } else { // use the array passed in this.data = data; this.position = start; this.max = start + len; } } @Override protected void attach(int bufferSize, BinaryDecoder decoder) { // buffer size is not used here, the byte[] source is the buffer. decoder.buf = this.data; decoder.pos = this.position; decoder.minPos = this.position; decoder.limit = this.max; this.ba = new BufferAccessor(decoder); } @Override protected void skipSourceBytes(long length) throws IOException { long skipped = trySkipBytes(length); if (skipped < length) { throw new EOFException(); } } @Override protected long trySkipBytes(long length) throws IOException { // the buffer is shared, so this should return 0 max = ba.getLim(); position = ba.getPos(); long remaining = (long) max - position; if (remaining >= length) { position = (int) (position + length); ba.setPos(position); return length; } else { position += remaining; ba.setPos(position); return remaining; } } @Override protected void readRaw(byte[] data, int off, int len) throws IOException { int read = tryReadRaw(data, off, len); if (read < len) { throw new EOFException(); } } @Override protected int tryReadRaw(byte[] data, int off, int len) throws IOException { // the buffer is shared, nothing to read return 0; } @Override protected void compactAndFill(byte[] buf, int pos, int minPos, int remaining) throws IOException { // this implementation does not want to mutate the array passed in, // so it makes a new tiny buffer unless it has been compacted once before if (!compacted) { // assumes ensureCapacity is never called with a size more than 16 byte[] tinybuf = new byte[remaining + 16]; System.arraycopy(buf, pos, tinybuf, 0, remaining); ba.setBuf(tinybuf, 0, remaining); compacted = true; } } @Override public int read() throws IOException { max = ba.getLim(); position = ba.getPos(); if (position >= max) { return -1; } else { int result = ba.getBuf()[position++] & 0xff; ba.setPos(position); return result; } } @Override public void close() throws IOException { ba.setPos(ba.getLim()); // effectively set isEof to false } @Override public boolean isEof() { int remaining = ba.getLim() - ba.getPos(); return (remaining == 0); } } }