package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.primitives.Ints;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.ISerializer;
import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.io.sstable.format.SSTableFlushObserver;
import org.apache.cassandra.io.sstable.format.Version;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.SequentialWriter;
import org.apache.cassandra.utils.ByteBufferUtil;
public class ColumnIndex
{
private DataOutputBuffer buffer;
private int indexSamplesSerializedSize;
private final List<IndexInfo> indexSamples = new ArrayList<>();
private DataOutputBuffer reusableBuffer;
public int columnIndexCount;
private int[] indexOffsets;
private final SerializationHeader header;
private final int version;
private final SequentialWriter writer;
private long initialPosition;
private final ISerializer<IndexInfo> idxSerializer;
public long headerLength;
private long startPosition;
private int written;
private long previousRowStart;
private ClusteringPrefix firstClustering;
private ClusteringPrefix lastClustering;
private DeletionTime openMarker;
private final Collection<SSTableFlushObserver> observers;
public ColumnIndex(SerializationHeader header,
SequentialWriter writer,
Version version,
Collection<SSTableFlushObserver> observers,
ISerializer<IndexInfo> indexInfoSerializer)
{
this.header = header;
this.writer = writer;
this.version = version.correspondingMessagingVersion();
this.observers = observers;
this.idxSerializer = indexInfoSerializer;
}
public void reset()
{
this.initialPosition = writer.position();
this.headerLength = -1;
this.startPosition = -1;
this.previousRowStart = 0;
this.columnIndexCount = 0;
this.written = 0;
this.indexSamplesSerializedSize = 0;
this.indexSamples.clear();
this.firstClustering = null;
this.lastClustering = null;
this.openMarker = null;
if (this.buffer != null)
this.reusableBuffer = this.buffer;
this.buffer = null;
}
public void buildRowIndex(UnfilteredRowIterator iterator) throws IOException
{
writePartitionHeader(iterator);
this.headerLength = writer.position() - initialPosition;
while (iterator.hasNext())
add(iterator.next());
finish();
}
private void writePartitionHeader(UnfilteredRowIterator iterator) throws IOException
{
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
if (header.hasStatic())
{
Row staticRow = iterator.staticRow();
UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version);
if (!observers.isEmpty())
observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
}
}
private long currentPosition()
{
return writer.position() - initialPosition;
}
public ByteBuffer buffer()
{
return buffer != null ? buffer.buffer() : null;
}
public List<IndexInfo> indexSamples()
{
if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) <= DatabaseDescriptor.getColumnIndexCacheSize())
{
return indexSamples;
}
return null;
}
public int[] offsets()
{
return indexOffsets != null
? Arrays.copyOf(indexOffsets, columnIndexCount)
: null;
}
private void addIndexBlock() throws IOException
{
IndexInfo cIndexInfo = new IndexInfo(firstClustering,
lastClustering,
startPosition,
currentPosition() - startPosition,
openMarker);
if (indexOffsets == null)
indexOffsets = new int[10];
else
{
if (columnIndexCount >= indexOffsets.length)
indexOffsets = Arrays.copyOf(indexOffsets, indexOffsets.length + 10);
if (columnIndexCount == 0)
{
indexOffsets[columnIndexCount] = 0;
}
else
{
indexOffsets[columnIndexCount] =
buffer != null
? Ints.checkedCast(buffer.position())
: indexSamplesSerializedSize;
}
}
columnIndexCount++;
if (buffer == null)
{
indexSamplesSerializedSize += idxSerializer.serializedSize(cIndexInfo);
if (indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0) > DatabaseDescriptor.getColumnIndexCacheSize())
{
buffer = reuseOrAllocateBuffer();
for (IndexInfo indexSample : indexSamples)
{
idxSerializer.serialize(indexSample, buffer);
}
}
else
{
indexSamples.add(cIndexInfo);
}
}
if (buffer != null)
{
idxSerializer.serialize(cIndexInfo, buffer);
}
firstClustering = null;
}
private DataOutputBuffer reuseOrAllocateBuffer()
{
if (reusableBuffer != null) {
DataOutputBuffer buffer = reusableBuffer;
buffer.clear();
return buffer;
}
return new DataOutputBuffer(DatabaseDescriptor.getColumnIndexCacheSize() * 2);
}
private void add(Unfiltered unfiltered) throws IOException
{
long pos = currentPosition();
if (firstClustering == null)
{
firstClustering = unfiltered.clustering();
startPosition = pos;
}
UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version);
if (!observers.isEmpty())
observers.forEach((o) -> o.nextUnfilteredCluster(unfiltered));
lastClustering = unfiltered.clustering();
previousRowStart = pos;
++written;
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
openMarker = marker.isOpen(false) ? marker.openDeletionTime(false) : null;
}
if (currentPosition() - startPosition >= DatabaseDescriptor.getColumnIndexSize())
addIndexBlock();
}
private void finish() throws IOException
{
UnfilteredSerializer.serializer.writeEndOfPartition(writer);
if (written == 0)
return;
if (firstClustering != null)
addIndexBlock();
if (buffer != null)
RowIndexEntry.Serializer.serializeOffsets(buffer, indexOffsets, columnIndexCount);
assert columnIndexCount > 0 && headerLength >= 0;
}
public int indexInfoSerializedSize()
{
return buffer != null
? buffer.buffer().limit()
: indexSamplesSerializedSize + columnIndexCount * TypeSizes.sizeof(0);
}
}