package org.apache.cassandra.io.sstable.metadata;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.IntervalSet;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.MurmurHash;
import org.apache.cassandra.utils.StreamingHistogram;
public class MetadataCollector implements PartitionStatisticsCollector
{
public static final double NO_COMPRESSION_RATIO = -1.0;
private static final ByteBuffer[] EMPTY_CLUSTERING = new ByteBuffer[0];
static EstimatedHistogram defaultCellPerPartitionCountHistogram()
{
return new EstimatedHistogram(114);
}
static EstimatedHistogram defaultPartitionSizeHistogram()
{
return new EstimatedHistogram(150);
}
static StreamingHistogram.StreamingHistogramBuilder defaultTombstoneDropTimeHistogramBuilder()
{
return new StreamingHistogram.StreamingHistogramBuilder(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE, SSTable.TOMBSTONE_HISTOGRAM_SPOOL_SIZE, SSTable.TOMBSTONE_HISTOGRAM_TTL_ROUND_SECONDS);
}
public static StatsMetadata defaultStatsMetadata()
{
return new StatsMetadata(defaultPartitionSizeHistogram(),
defaultCellPerPartitionCountHistogram(),
IntervalSet.empty(),
Long.MIN_VALUE,
Long.MAX_VALUE,
Integer.MAX_VALUE,
Integer.MAX_VALUE,
0,
Integer.MAX_VALUE,
NO_COMPRESSION_RATIO,
defaultTombstoneDropTimeHistogramBuilder().build(),
0,
Collections.<ByteBuffer>emptyList(),
Collections.<ByteBuffer>emptyList(),
true,
ActiveRepairService.UNREPAIRED_SSTABLE,
-1,
-1);
}
protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram();
protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram();
protected IntervalSet<CommitLogPosition> commitLogIntervals = IntervalSet.empty();
protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker();
protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME);
protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL);
protected double compressionRatio = NO_COMPRESSION_RATIO;
protected StreamingHistogram.StreamingHistogramBuilder estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogramBuilder();
protected int sstableLevel;
private ClusteringPrefix minClustering = null;
private ClusteringPrefix maxClustering = null;
protected boolean hasLegacyCounterShards = false;
protected long totalColumnsSet;
protected long totalRows;
protected ICardinality cardinality = new HyperLogLogPlus(13, 25);
private final ClusteringComparator comparator;
public MetadataCollector(ClusteringComparator comparator)
{
this.comparator = comparator;
}
public MetadataCollector(Iterable<SSTableReader> sstables, ClusteringComparator comparator, int level)
{
this(comparator);
IntervalSet.Builder<CommitLogPosition> intervals = new IntervalSet.Builder<>();
for (SSTableReader sstable : sstables)
{
intervals.addAll(sstable.getSSTableMetadata().commitLogIntervals);
}
commitLogIntervals(intervals.build());
sstableLevel(level);
}
public MetadataCollector addKey(ByteBuffer key)
{
long hashed = MurmurHash.hash2_64(key, key.position(), key.remaining(), 0);
cardinality.offerHashed(hashed);
return this;
}
public MetadataCollector addPartitionSizeInBytes(long partitionSize)
{
estimatedPartitionSize.add(partitionSize);
return this;
}
public MetadataCollector addCellPerPartitionCount(long cellCount)
{
estimatedCellPerPartitionCount.add(cellCount);
return this;
}
public MetadataCollector mergeTombstoneHistogram(StreamingHistogram histogram)
{
estimatedTombstoneDropTime.merge(histogram);
return this;
}
public MetadataCollector addCompressionRatio(long compressed, long uncompressed)
{
compressionRatio = (double) compressed/uncompressed;
return this;
}
public void update(LivenessInfo newInfo)
{
if (newInfo.isEmpty())
return;
updateTimestamp(newInfo.timestamp());
updateTTL(newInfo.ttl());
updateLocalDeletionTime(newInfo.localExpirationTime());
}
public void update(Cell cell)
{
updateTimestamp(cell.timestamp());
updateTTL(cell.ttl());
updateLocalDeletionTime(cell.localDeletionTime());
}
public void update(DeletionTime dt)
{
if (!dt.isLive())
{
updateTimestamp(dt.markedForDeleteAt());
updateLocalDeletionTime(dt.localDeletionTime());
}
}
public void updateColumnSetPerRow(long columnSetInRow)
{
totalColumnsSet += columnSetInRow;
++totalRows;
}
private void updateTimestamp(long newTimestamp)
{
timestampTracker.update(newTimestamp);
}
private void updateLocalDeletionTime(int newLocalDeletionTime)
{
localDeletionTimeTracker.update(newLocalDeletionTime);
if (newLocalDeletionTime != Cell.NO_DELETION_TIME)
estimatedTombstoneDropTime.update(newLocalDeletionTime);
}
private void updateTTL(int newTTL)
{
ttlTracker.update(newTTL);
}
public MetadataCollector commitLogIntervals(IntervalSet<CommitLogPosition> commitLogIntervals)
{
this.commitLogIntervals = commitLogIntervals;
return this;
}
public MetadataCollector sstableLevel(int sstableLevel)
{
this.sstableLevel = sstableLevel;
return this;
}
public MetadataCollector updateClusteringValues(ClusteringPrefix clustering)
{
minClustering = minClustering == null || comparator.compare(clustering, minClustering) < 0 ? clustering : minClustering;
maxClustering = maxClustering == null || comparator.compare(clustering, maxClustering) > 0 ? clustering : maxClustering;
return this;
}
private static ByteBuffer maybeMinimize(ByteBuffer buffer)
{
return buffer == null ? null : ByteBufferUtil.minimalBufferFor(buffer);
}
private static ByteBuffer min(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
{
if (b1 == null)
return b2;
if (b2 == null)
return b1;
if (comparator.compare(b1, b2) >= 0)
return b2;
return b1;
}
private static ByteBuffer max(ByteBuffer b1, ByteBuffer b2, AbstractType<?> comparator)
{
if (b1 == null)
return b2;
if (b2 == null)
return b1;
if (comparator.compare(b1, b2) >= 0)
return b1;
return b2;
}
public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards)
{
this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards;
}
public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader header)
{
Preconditions.checkState((minClustering == null && maxClustering == null)
|| comparator.compare(maxClustering, minClustering) >= 0);
ByteBuffer[] minValues = minClustering != null ? minClustering.getRawValues() : EMPTY_CLUSTERING;
ByteBuffer[] maxValues = maxClustering != null ? maxClustering.getRawValues() : EMPTY_CLUSTERING;
Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
components.put(MetadataType.STATS, new StatsMetadata(estimatedPartitionSize,
estimatedCellPerPartitionCount,
commitLogIntervals,
timestampTracker.min(),
timestampTracker.max(),
localDeletionTimeTracker.min(),
localDeletionTimeTracker.max(),
ttlTracker.min(),
ttlTracker.max(),
compressionRatio,
estimatedTombstoneDropTime.build(),
sstableLevel,
makeList(minValues),
makeList(maxValues),
hasLegacyCounterShards,
repairedAt,
totalColumnsSet,
totalRows));
components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality));
components.put(MetadataType.HEADER, header.toComponent());
return components;
}
private static List<ByteBuffer> makeList(ByteBuffer[] values)
{
List<ByteBuffer> l = new ArrayList<ByteBuffer>(values.length);
for (int i = 0; i < values.length; i++)
if (values[i] == null)
break;
else
l.add(values[i]);
return l;
}
public static class MinMaxLongTracker
{
private final long defaultMin;
private final long defaultMax;
private boolean isSet = false;
private long min;
private long max;
public MinMaxLongTracker()
{
this(Long.MIN_VALUE, Long.MAX_VALUE);
}
public MinMaxLongTracker(long defaultMin, long defaultMax)
{
this.defaultMin = defaultMin;
this.defaultMax = defaultMax;
}
public void update(long value)
{
if (!isSet)
{
min = max = value;
isSet = true;
}
else
{
if (value < min)
min = value;
if (value > max)
max = value;
}
}
public long min()
{
return isSet ? min : defaultMin;
}
public long max()
{
return isSet ? max : defaultMax;
}
}
public static class MinMaxIntTracker
{
private final int defaultMin;
private final int defaultMax;
private boolean isSet = false;
private int min;
private int max;
public MinMaxIntTracker()
{
this(Integer.MIN_VALUE, Integer.MAX_VALUE);
}
public MinMaxIntTracker(int defaultMin, int defaultMax)
{
this.defaultMin = defaultMin;
this.defaultMax = defaultMax;
}
public void update(int value)
{
if (!isSet)
{
min = max = value;
isSet = true;
}
else
{
if (value < min)
min = value;
if (value > max)
max = value;
}
}
public int min()
{
return isSet ? min : defaultMin;
}
public int max()
{
return isSet ? max : defaultMax;
}
}
}