package org.apache.cassandra.io.util;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.config.Config;
public class DataOutputBuffer extends BufferedDataOutputStreamPlus
{
static final long DOUBLING_THRESHOLD = Long.getLong(Config.PROPERTY_PREFIX + "DOB_DOUBLING_THRESHOLD_MB", 64);
private static final int MAX_RECYCLE_BUFFER_SIZE = Integer.getInteger(Config.PROPERTY_PREFIX + "dob_max_recycle_bytes", 1024 * 1024);
private static final int DEFAULT_INITIAL_BUFFER_SIZE = 128;
public static final FastThreadLocal<DataOutputBuffer> scratchBuffer = new FastThreadLocal<DataOutputBuffer>()
{
protected DataOutputBuffer initialValue() throws Exception
{
return new DataOutputBuffer()
{
public void close()
{
if (buffer.capacity() <= MAX_RECYCLE_BUFFER_SIZE)
{
buffer.clear();
}
else
{
buffer = ByteBuffer.allocate(DEFAULT_INITIAL_BUFFER_SIZE);
}
}
};
}
};
public DataOutputBuffer()
{
this(DEFAULT_INITIAL_BUFFER_SIZE);
}
public DataOutputBuffer(int size)
{
super(ByteBuffer.allocate(size));
}
public DataOutputBuffer(ByteBuffer buffer)
{
super(buffer);
}
@Override
public void flush() throws IOException
{
throw new UnsupportedOperationException();
}
@VisibleForTesting
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
@VisibleForTesting
static int saturatedArraySizeCast(long size)
{
Preconditions.checkArgument(size >= 0);
return (int)Math.min(MAX_ARRAY_SIZE, size);
}
@VisibleForTesting
static int checkedArraySizeCast(long size)
{
Preconditions.checkArgument(size >= 0);
Preconditions.checkArgument(size <= MAX_ARRAY_SIZE);
return (int)size;
}
@Override
protected void doFlush(int count) throws IOException
{
expandToFit(count);
}
@VisibleForTesting
long capacity()
{
return buffer.capacity();
}
@VisibleForTesting
long validateReallocation(long newSize)
{
int saturatedSize = saturatedArraySizeCast(newSize);
if (saturatedSize <= capacity())
throw new RuntimeException();
return saturatedSize;
}
@VisibleForTesting
long calculateNewSize(long count)
{
long capacity = capacity();
long newSize = capacity + count;
if (capacity > 1024L * 1024L * DOUBLING_THRESHOLD)
newSize = Math.max((capacity * 3L) / 2L, newSize);
else
newSize = Math.max(capacity * 2L, newSize);
return validateReallocation(newSize);
}
protected void expandToFit(long count)
{
if (count <= 0)
return;
ByteBuffer newBuffer = ByteBuffer.allocate(checkedArraySizeCast(calculateNewSize(count)));
buffer.flip();
newBuffer.put(buffer);
buffer = newBuffer;
}
@Override
protected WritableByteChannel newDefaultChannel()
{
return new GrowingChannel();
}
public void clear()
{
buffer.clear();
}
@VisibleForTesting
final class GrowingChannel implements WritableByteChannel
{
public int write(ByteBuffer src) throws IOException
{
int count = src.remaining();
expandToFit(count);
buffer.put(src);
return count;
}
public boolean isOpen()
{
return true;
}
public void close()
{
}
}
@Override
public void close()
{
}
public ByteBuffer buffer()
{
return buffer(true);
}
public ByteBuffer buffer(boolean duplicate)
{
if (!duplicate)
{
ByteBuffer buf = buffer;
buf.flip();
buffer = null;
return buf;
}
ByteBuffer result = buffer.duplicate();
result.flip();
return result;
}
public byte[] getData()
{
assert buffer.arrayOffset() == 0;
return buffer.array();
}
public int getLength()
{
return buffer.position();
}
public boolean hasPosition()
{
return true;
}
public long position()
{
return getLength();
}
public ByteBuffer asNewBuffer()
{
return ByteBuffer.wrap(toByteArray());
}
public byte[] toByteArray()
{
ByteBuffer buffer = buffer();
byte[] result = new byte[buffer.remaining()];
buffer.get(result);
return result;
}
}