package org.apache.cassandra.io.sstable.format.big;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.columniterator.SSTableIterator;
import org.apache.cassandra.db.columniterator.SSTableReversedIterator;
import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SkippingReason;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener.SelectionReason;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
public class BigTableReader extends SSTableReader
{
private static final Logger logger = LoggerFactory.getLogger(BigTableReader.class);
BigTableReader(Descriptor desc, Set<Component> components, CFMetaData metadata, Long maxDataAge, StatsMetadata sstableMetadata, OpenReason openReason, SerializationHeader header)
{
super(desc, components, metadata, maxDataAge, sstableMetadata, openReason, header);
}
public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift, SSTableReadsListener listener)
{
RowIndexEntry rie = getPosition(key, SSTableReader.Operator.EQ, listener);
return iterator(null, key, rie, slices, selectedColumns, reversed, isForThrift);
}
public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed, boolean isForThrift)
{
if (indexEntry == null)
return UnfilteredRowIterators.noRowsIterator(metadata, key, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, reversed);
return reversed
? new SSTableReversedIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile)
: new SSTableIterator(this, file, key, indexEntry, slices, selectedColumns, isForThrift, ifile);
}
@Override
public ISSTableScanner getScanner(ColumnFilter columns,
DataRange dataRange,
RateLimiter limiter,
boolean isForThrift,
SSTableReadsListener listener)
{
return BigTableScanner.getScanner(this, columns, dataRange, limiter, isForThrift, listener);
}
public ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> boundsIterator)
{
return BigTableScanner.getScanner(this, boundsIterator);
}
public ISSTableScanner getScanner(RateLimiter limiter)
{
return BigTableScanner.getScanner(this, limiter);
}
public ISSTableScanner getScanner(Collection<Range<Token>> ranges, RateLimiter limiter)
{
if (ranges != null)
return BigTableScanner.getScanner(this, ranges, limiter);
else
return getScanner(limiter);
}
@SuppressWarnings("resource")
@Override
public UnfilteredRowIterator simpleIterator(FileDataInput dfile, DecoratedKey key, RowIndexEntry position, boolean tombstoneOnly)
{
return SSTableIdentityIterator.create(this, dfile, position, key, tombstoneOnly);
}
@Override
protected RowIndexEntry getPosition(PartitionPosition key,
Operator op,
boolean updateCacheAndStats,
boolean permitMatchPastLast,
SSTableReadsListener listener)
{
if (op == Operator.EQ)
{
assert key instanceof DecoratedKey;
if (!bf.isPresent((DecoratedKey)key))
{
listener.onSSTableSkipped(this, SkippingReason.BLOOM_FILTER);
Tracing.trace("Bloom filter allows skipping sstable {}", descriptor.generation);
return null;
}
}
if ((op == Operator.EQ || op == Operator.GE) && (key instanceof DecoratedKey))
{
DecoratedKey decoratedKey = (DecoratedKey)key;
KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, decoratedKey.getKey());
RowIndexEntry cachedPosition = getCachedPosition(cacheKey, updateCacheAndStats);
if (cachedPosition != null)
{
listener.onSSTableSelected(this, cachedPosition, SelectionReason.KEY_CACHE_HIT);
Tracing.trace("Key cache hit for sstable {}", descriptor.generation);
return cachedPosition;
}
}
boolean skip = false;
if (key.compareTo(first) < 0)
{
if (op == Operator.EQ)
skip = true;
else
key = first;
op = Operator.EQ;
}
else
{
int l = last.compareTo(key);
skip = l <= 0 && (l < 0 || (!permitMatchPastLast && op == Operator.GT));
}
if (skip)
{
if (op == Operator.EQ && updateCacheAndStats)
bloomFilterTracker.addFalsePositive();
listener.onSSTableSkipped(this, SkippingReason.MIN_MAX_KEYS);
Tracing.trace("Check against min and max keys allows skipping sstable {}", descriptor.generation);
return null;
}
int binarySearchResult = indexSummary.binarySearch(key);
long sampledPosition = getIndexScanPositionFromBinarySearchResult(binarySearchResult, indexSummary);
int sampledIndex = getIndexSummaryIndexFromBinarySearchResult(binarySearchResult);
int effectiveInterval = indexSummary.getEffectiveIndexIntervalAfterIndex(sampledIndex);
if (ifile == null)
return null;
int i = 0;
String path = null;
try (FileDataInput in = ifile.createReader(sampledPosition))
{
path = in.getPath();
while (!in.isEOF())
{
i++;
ByteBuffer indexKey = ByteBufferUtil.readWithShortLength(in);
boolean opSatisfied;
boolean exactMatch;
if (op == Operator.EQ && i <= effectiveInterval)
{
opSatisfied = exactMatch = indexKey.equals(((DecoratedKey) key).getKey());
}
else
{
DecoratedKey indexDecoratedKey = decorateKey(indexKey);
int comparison = indexDecoratedKey.compareTo(key);
int v = op.apply(comparison);
opSatisfied = (v == 0);
exactMatch = (comparison == 0);
if (v < 0)
{
listener.onSSTableSkipped(this, SkippingReason.PARTITION_INDEX_LOOKUP);
Tracing.trace("Partition index lookup allows skipping sstable {}", descriptor.generation);
return null;
}
}
if (opSatisfied)
{
RowIndexEntry indexEntry = rowIndexEntrySerializer.deserialize(in, in.getFilePointer());
if (exactMatch && updateCacheAndStats)
{
assert key instanceof DecoratedKey;
DecoratedKey decoratedKey = (DecoratedKey)key;
if (logger.isTraceEnabled())
{
try (FileDataInput fdi = dfile.createReader(indexEntry.position))
{
DecoratedKey keyInDisk = decorateKey(ByteBufferUtil.readWithShortLength(fdi));
if (!keyInDisk.equals(key))
throw new AssertionError(String.format("%s != %s in %s", keyInDisk, key, fdi.getPath()));
}
}
cacheKey(decoratedKey, indexEntry);
}
if (op == Operator.EQ && updateCacheAndStats)
bloomFilterTracker.addTruePositive();
listener.onSSTableSelected(this, indexEntry, SelectionReason.INDEX_ENTRY_FOUND);
Tracing.trace("Partition index with {} entries found for sstable {}", indexEntry.columnsIndexCount(), descriptor.generation);
return indexEntry;
}
RowIndexEntry.Serializer.skip(in, descriptor.version);
}
}
catch (IOException e)
{
markSuspect();
throw new CorruptSSTableException(e, path);
}
if (op == SSTableReader.Operator.EQ && updateCacheAndStats)
bloomFilterTracker.addFalsePositive();
listener.onSSTableSkipped(this, SkippingReason.INDEX_ENTRY_NOT_FOUND);
Tracing.trace("Partition index lookup complete (bloom filter false positive) for sstable {}", descriptor.generation);
return null;
}
}