 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.
package org.apache.cassandra.io.util;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;

import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.vint.VIntCoding;

import com.google.common.base.Preconditions;

Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc). RebufferingInputStream is not thread safe.
/** * Rough equivalent of BufferedInputStream and DataInputStream wrapping a ByteBuffer that can be refilled * via rebuffer. Implementations provide this buffer from various channels (socket, file, memory, etc). * * RebufferingInputStream is not thread safe. */
public abstract class RebufferingInputStream extends InputStream implements DataInputPlus, Closeable { protected ByteBuffer buffer; protected RebufferingInputStream(ByteBuffer buffer) { Preconditions.checkArgument(buffer == null || buffer.order() == ByteOrder.BIG_ENDIAN, "Buffer must have BIG ENDIAN byte ordering"); this.buffer = buffer; }
Implementations must implement this method to refill the buffer. They can expect the buffer to be empty when this method is invoked.
  • IOException –
/** * Implementations must implement this method to refill the buffer. * They can expect the buffer to be empty when this method is invoked. * @throws IOException */
protected abstract void reBuffer() throws IOException; @Override public void readFully(byte[] b) throws IOException { readFully(b, 0, b.length); } @Override public void readFully(byte[] b, int off, int len) throws IOException { int read = read(b, off, len); if (read < len) throw new EOFException("EOF after " + read + " bytes out of " + len); } @Override public int read(byte[] b, int off, int len) throws IOException { // avoid int overflow if (off < 0 || off > b.length || len < 0 || len > b.length - off) throw new IndexOutOfBoundsException(); if (len == 0) return 0; int copied = 0; while (copied < len) { int position = buffer.position(); int remaining = buffer.limit() - position; if (remaining == 0) { reBuffer(); position = buffer.position(); remaining = buffer.limit() - position; if (remaining == 0) return copied == 0 ? -1 : copied; } int toCopy = Math.min(len - copied, remaining); FastByteOperations.copy(buffer, position, b, off + copied, toCopy); buffer.position(position + toCopy); copied += toCopy; } return copied; } @DontInline protected long readPrimitiveSlowly(int bytes) throws IOException { long result = 0; for (int i = 0; i < bytes; i++) result = (result << 8) | (readByte() & 0xFFL); return result; } @Override public int skipBytes(int n) throws IOException { if (n < 0) return 0; int requested = n; int position = buffer.position(), limit = buffer.limit(), remaining; while ((remaining = limit - position) < n) { n -= remaining; buffer.position(limit); reBuffer(); position = buffer.position(); limit = buffer.limit(); if (position == limit) return requested - n; } buffer.position(position + n); return requested; } @Override public boolean readBoolean() throws IOException { return readByte() != 0; } @Override public byte readByte() throws IOException { if (!buffer.hasRemaining()) { reBuffer(); if (!buffer.hasRemaining()) throw new EOFException(); } return buffer.get(); } @Override public int readUnsignedByte() throws IOException { return readByte() & 0xff; } @Override public short readShort() throws IOException { if (buffer.remaining() >= 2) return buffer.getShort(); else return (short) readPrimitiveSlowly(2); } @Override public int readUnsignedShort() throws IOException { return readShort() & 0xFFFF; } @Override public char readChar() throws IOException { if (buffer.remaining() >= 2) return buffer.getChar(); else return (char) readPrimitiveSlowly(2); } @Override public int readInt() throws IOException { if (buffer.remaining() >= 4) return buffer.getInt(); else return (int) readPrimitiveSlowly(4); } @Override public long readLong() throws IOException { if (buffer.remaining() >= 8) return buffer.getLong(); else return readPrimitiveSlowly(8); } public long readVInt() throws IOException { return VIntCoding.decodeZigZag64(readUnsignedVInt()); } public long readUnsignedVInt() throws IOException { //If 9 bytes aren't available use the slow path in VIntCoding if (buffer.remaining() < 9) return VIntCoding.readUnsignedVInt(this); byte firstByte = buffer.get(); //Bail out early if this is one byte, necessary or it fails later if (firstByte >= 0) return firstByte; int extraBytes = VIntCoding.numberOfExtraBytesToRead(firstByte); int position = buffer.position(); int extraBits = extraBytes * 8; long retval = buffer.getLong(position); if (buffer.order() == ByteOrder.LITTLE_ENDIAN) retval = Long.reverseBytes(retval); buffer.position(position + extraBytes); // truncate the bytes we read in excess of those we needed retval >>>= 64 - extraBits; // remove the non-value bits from the first byte firstByte &= VIntCoding.firstByteValueMask(extraBytes); // shift the first byte up to its correct position retval |= (long) firstByte << extraBits; return retval; } @Override public float readFloat() throws IOException { if (buffer.remaining() >= 4) return buffer.getFloat(); else return Float.intBitsToFloat((int)readPrimitiveSlowly(4)); } @Override public double readDouble() throws IOException { if (buffer.remaining() >= 8) return buffer.getDouble(); else return Double.longBitsToDouble(readPrimitiveSlowly(8)); } @Override public String readLine() throws IOException { throw new UnsupportedOperationException(); } @Override public String readUTF() throws IOException { return DataInputStream.readUTF(this); } @Override public int read() throws IOException { try { return readUnsignedByte(); } catch (EOFException ex) { return -1; } } }