package org.apache.cassandra.io.util;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.WritableByteChannel;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.utils.memory.MemoryUtil;
import org.apache.cassandra.utils.vint.VIntCoding;
public class BufferedDataOutputStreamPlus extends DataOutputStreamPlus
{
private static final int DEFAULT_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "nio_data_output_stream_plus_buffer_size", 1024 * 32);
protected ByteBuffer buffer;
protected boolean strictFlushing = false;
public BufferedDataOutputStreamPlus(RandomAccessFile ras)
{
this(ras.getChannel());
}
public BufferedDataOutputStreamPlus(RandomAccessFile ras, int bufferSize)
{
this(ras.getChannel(), bufferSize);
}
public BufferedDataOutputStreamPlus(FileOutputStream fos)
{
this(fos.getChannel());
}
public BufferedDataOutputStreamPlus(FileOutputStream fos, int bufferSize)
{
this(fos.getChannel(), bufferSize);
}
public BufferedDataOutputStreamPlus(WritableByteChannel wbc)
{
this(wbc, DEFAULT_BUFFER_SIZE);
}
public BufferedDataOutputStreamPlus(WritableByteChannel wbc, int bufferSize)
{
this(wbc, ByteBuffer.allocateDirect(bufferSize));
Preconditions.checkNotNull(wbc);
Preconditions.checkArgument(bufferSize >= 8, "Buffer size must be large enough to accommodate a long/double");
}
protected BufferedDataOutputStreamPlus(WritableByteChannel channel, ByteBuffer buffer)
{
super(channel);
this.buffer = buffer;
}
protected BufferedDataOutputStreamPlus(ByteBuffer buffer)
{
super();
this.buffer = buffer;
}
@Override
public void write(byte[] b) throws IOException
{
write(b, 0, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
if (b == null)
throw new NullPointerException();
if (off < 0 || off > b.length || len < 0
|| len > b.length - off)
throw new IndexOutOfBoundsException();
if (len == 0)
return;
int copied = 0;
while (copied < len)
{
if (buffer.hasRemaining())
{
int toCopy = Math.min(len - copied, buffer.remaining());
buffer.put(b, off + copied, toCopy);
copied += toCopy;
}
else
{
doFlush(len - copied);
}
}
}
private final ByteBuffer hollowBuffer = MemoryUtil.getHollowDirectByteBuffer();
@Override
public void write(ByteBuffer toWrite) throws IOException
{
if (toWrite.hasArray())
{
write(toWrite.array(), toWrite.arrayOffset() + toWrite.position(), toWrite.remaining());
}
else
{
assert toWrite.isDirect();
MemoryUtil.duplicateDirectByteBuffer(toWrite, hollowBuffer);
int toWriteRemaining = toWrite.remaining();
if (toWriteRemaining > buffer.remaining())
{
if (strictFlushing)
{
writeExcessSlow();
}
else
{
doFlush(toWriteRemaining - buffer.remaining());
while (hollowBuffer.remaining() > buffer.capacity())
channel.write(hollowBuffer);
}
}
buffer.put(hollowBuffer);
}
}
@DontInline
private void writeExcessSlow() throws IOException
{
int originalLimit = hollowBuffer.limit();
while (originalLimit - hollowBuffer.position() > buffer.remaining())
{
hollowBuffer.limit(hollowBuffer.position() + buffer.remaining());
buffer.put(hollowBuffer);
doFlush(originalLimit - hollowBuffer.position());
}
hollowBuffer.limit(originalLimit);
}
@Override
public void write(int b) throws IOException
{
if (!buffer.hasRemaining())
doFlush(1);
buffer.put((byte) (b & 0xFF));
}
@Override
public void writeBoolean(boolean v) throws IOException
{
if (!buffer.hasRemaining())
doFlush(1);
buffer.put(v ? (byte)1 : (byte)0);
}
@Override
public void writeByte(int v) throws IOException
{
write(v);
}
@Override
public void writeShort(int v) throws IOException
{
writeChar(v);
}
@Override
public void writeChar(int v) throws IOException
{
if (buffer.remaining() < 2)
writeSlow(v, 2);
else
buffer.putChar((char) v);
}
@Override
public void writeInt(int v) throws IOException
{
if (buffer.remaining() < 4)
writeSlow(v, 4);
else
buffer.putInt(v);
}
@Override
public void writeLong(long v) throws IOException
{
if (buffer.remaining() < 8)
writeSlow(v, 8);
else
buffer.putLong(v);
}
@Override
public void writeVInt(long value) throws IOException
{
writeUnsignedVInt(VIntCoding.encodeZigZag64(value));
}
@Override
public void writeUnsignedVInt(long value) throws IOException
{
int size = VIntCoding.computeUnsignedVIntSize(value);
if (size == 1)
{
write((int) value);
return;
}
write(VIntCoding.encodeVInt(value, size), 0, size);
}
@Override
public void writeFloat(float v) throws IOException
{
writeInt(Float.floatToRawIntBits(v));
}
@Override
public void writeDouble(double v) throws IOException
{
writeLong(Double.doubleToRawLongBits(v));
}
@DontInline
private void writeSlow(long bytes, int count) throws IOException
{
int origCount = count;
if (ByteOrder.BIG_ENDIAN == buffer.order())
while (count > 0) writeByte((int) (bytes >>> (8 * --count)));
else
while (count > 0) writeByte((int) (bytes >>> (8 * (origCount - count--))));
}
@Override
public void writeBytes(String s) throws IOException
{
for (int index = 0; index < s.length(); index++)
writeByte(s.charAt(index));
}
@Override
public void writeChars(String s) throws IOException
{
for (int index = 0; index < s.length(); index++)
writeChar(s.charAt(index));
}
@Override
public void writeUTF(String s) throws IOException
{
UnbufferedDataOutputStreamPlus.writeUTF(s, this);
}
@Override
public void write(Memory memory, long offset, long length) throws IOException
{
for (ByteBuffer buffer : memory.asByteBuffers(offset, length))
write(buffer);
}
@DontInline
protected void doFlush(int count) throws IOException
{
buffer.flip();
while (buffer.hasRemaining())
channel.write(buffer);
buffer.clear();
}
@Override
public void flush() throws IOException
{
doFlush(0);
}
@Override
public void close() throws IOException
{
doFlush(0);
channel.close();
FileUtils.clean(buffer);
buffer = null;
}
@Override
public <R> R applyToChannel(Function<WritableByteChannel, R> f) throws IOException
{
if (strictFlushing)
throw new UnsupportedOperationException();
flush();
return f.apply(channel);
}
public BufferedDataOutputStreamPlus order(ByteOrder order)
{
this.buffer.order(order);
return this;
}
}