package org.apache.cassandra.db.compaction;
import java.util.*;
import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.Pair;
import static com.google.common.collect.Iterables.filter;
public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
{
private static final Logger logger = LoggerFactory.getLogger(SizeTieredCompactionStrategy.class);
private static final Comparator<Pair<List<SSTableReader>,Double>> bucketsByHotnessComparator = new Comparator<Pair<List<SSTableReader>, Double>>()
{
public int compare(Pair<List<SSTableReader>, Double> o1, Pair<List<SSTableReader>, Double> o2)
{
int comparison = Double.compare(o1.right, o2.right);
if (comparison != 0)
return comparison;
return Long.compare(avgSize(o1.left), avgSize(o2.left));
}
private long avgSize(List<SSTableReader> sstables)
{
long n = 0;
for (SSTableReader sstable : sstables)
n += sstable.bytesOnDisk();
return n / sstables.size();
}
};
protected SizeTieredCompactionStrategyOptions sizeTieredOptions;
protected volatile int estimatedRemainingTasks;
@VisibleForTesting
protected final Set<SSTableReader> sstables = new HashSet<>();
public SizeTieredCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
{
super(cfs, options);
this.estimatedRemainingTasks = 0;
this.sizeTieredOptions = new SizeTieredCompactionStrategyOptions(options);
}
private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
int minThreshold = cfs.getMinimumCompactionThreshold();
int maxThreshold = cfs.getMaximumCompactionThreshold();
Iterable<SSTableReader> candidates = filterSuspectSSTables(filter(cfs.getUncompactingSSTables(), sstables::contains));
List<List<SSTableReader>> buckets = getBuckets(createSSTableAndLengthPairs(candidates), sizeTieredOptions.bucketHigh, sizeTieredOptions.bucketLow, sizeTieredOptions.minSSTableSize);
logger.trace("Compaction buckets are {}", buckets);
estimatedRemainingTasks = getEstimatedCompactionsByTasks(cfs, buckets);
cfs.getCompactionStrategyManager().compactionLogger.pending(this, estimatedRemainingTasks);
List<SSTableReader> mostInteresting = mostInterestingBucket(buckets, minThreshold, maxThreshold);
if (!mostInteresting.isEmpty())
return mostInteresting;
List<SSTableReader> sstablesWithTombstones = new ArrayList<>();
for (SSTableReader sstable : candidates)
{
if (worthDroppingTombstones(sstable, gcBefore))
sstablesWithTombstones.add(sstable);
}
if (sstablesWithTombstones.isEmpty())
return Collections.emptyList();
return Collections.singletonList(Collections.max(sstablesWithTombstones, SSTableReader.sizeComparator));
}
public static List<SSTableReader> mostInterestingBucket(List<List<SSTableReader>> buckets, int minThreshold, int maxThreshold)
{
final List<Pair<List<SSTableReader>, Double>> prunedBucketsAndHotness = new ArrayList<>(buckets.size());
for (List<SSTableReader> bucket : buckets)
{
Pair<List<SSTableReader>, Double> bucketAndHotness = trimToThresholdWithHotness(bucket, maxThreshold);
if (bucketAndHotness != null && bucketAndHotness.left.size() >= minThreshold)
prunedBucketsAndHotness.add(bucketAndHotness);
}
if (prunedBucketsAndHotness.isEmpty())
return Collections.emptyList();
Pair<List<SSTableReader>, Double> hottest = Collections.max(prunedBucketsAndHotness, bucketsByHotnessComparator);
return hottest.left;
}
@VisibleForTesting
static Pair<List<SSTableReader>, Double> trimToThresholdWithHotness(List<SSTableReader> bucket, int maxThreshold)
{
final Map<SSTableReader, Double> hotnessSnapshot = getHotnessMap(bucket);
Collections.sort(bucket, new Comparator<SSTableReader>()
{
public int compare(SSTableReader o1, SSTableReader o2)
{
return -1 * Double.compare(hotnessSnapshot.get(o1), hotnessSnapshot.get(o2));
}
});
List<SSTableReader> prunedBucket = bucket.subList(0, Math.min(bucket.size(), maxThreshold));
double bucketHotness = 0.0;
for (SSTableReader sstr : prunedBucket)
bucketHotness += hotness(sstr);
return Pair.create(prunedBucket, bucketHotness);
}
private static Map<SSTableReader, Double> getHotnessMap(Collection<SSTableReader> sstables)
{
Map<SSTableReader, Double> hotness = new HashMap<>(sstables.size());
for (SSTableReader sstable : sstables)
hotness.put(sstable, hotness(sstable));
return hotness;
}
private static double hotness(SSTableReader sstr)
{
return sstr.getReadMeter() == null ? 0.0 : sstr.getReadMeter().twoHourRate() / sstr.estimatedKeys();
}
@SuppressWarnings("resource")
public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
{
List<SSTableReader> previousCandidate = null;
while (true)
{
List<SSTableReader> hottestBucket = getNextBackgroundSSTables(gcBefore);
if (hottestBucket.isEmpty())
return null;
if (hottestBucket.equals(previousCandidate))
{
logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
"unless it happens frequently, in which case it must be reported. Will retry later.",
hottestBucket);
return null;
}
LifecycleTransaction transaction = cfs.getTracker().tryModify(hottestBucket, OperationType.COMPACTION);
if (transaction != null)
return new CompactionTask(cfs, transaction, gcBefore);
previousCandidate = hottestBucket;
}
}
@SuppressWarnings("resource")
public synchronized Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore, boolean splitOutput)
{
Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables);
if (Iterables.isEmpty(filteredSSTables))
return null;
LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION);
if (txn == null)
return null;
if (splitOutput)
return Arrays.<AbstractCompactionTask>asList(new SplittingCompactionTask(cfs, txn, gcBefore));
return Arrays.<AbstractCompactionTask>asList(new CompactionTask(cfs, txn, gcBefore));
}
@SuppressWarnings("resource")
public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, final int gcBefore)
{
assert !sstables.isEmpty();
LifecycleTransaction transaction = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION);
if (transaction == null)
{
logger.trace("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables);
return null;
}
return new CompactionTask(cfs, transaction, gcBefore).setUserDefined(true);
}
public int getEstimatedRemainingTasks()
{
return estimatedRemainingTasks;
}
public static List<Pair<SSTableReader, Long>> createSSTableAndLengthPairs(Iterable<SSTableReader> sstables)
{
List<Pair<SSTableReader, Long>> sstableLengthPairs = new ArrayList<>(Iterables.size(sstables));
for(SSTableReader sstable : sstables)
sstableLengthPairs.add(Pair.create(sstable, sstable.onDiskLength()));
return sstableLengthPairs;
}
public static <T> List<List<T>> getBuckets(Collection<Pair<T, Long>> files, double bucketHigh, double bucketLow, long minSSTableSize)
{
List<Pair<T, Long>> sortedFiles = new ArrayList<Pair<T, Long>>(files);
Collections.sort(sortedFiles, new Comparator<Pair<T, Long>>()
{
public int compare(Pair<T, Long> p1, Pair<T, Long> p2)
{
return p1.right.compareTo(p2.right);
}
});
Map<Long, List<T>> buckets = new HashMap<Long, List<T>>();
outer:
for (Pair<T, Long> pair: sortedFiles)
{
long size = pair.right;
for (Entry<Long, List<T>> entry : buckets.entrySet())
{
List<T> bucket = entry.getValue();
long oldAverageSize = entry.getKey();
if ((size > (oldAverageSize * bucketLow) && size < (oldAverageSize * bucketHigh))
|| (size < minSSTableSize && oldAverageSize < minSSTableSize))
{
buckets.remove(oldAverageSize);
long totalSize = bucket.size() * oldAverageSize;
long newAverageSize = (totalSize + size) / (bucket.size() + 1);
bucket.add(pair.left);
buckets.put(newAverageSize, bucket);
continue outer;
}
}
ArrayList<T> bucket = new ArrayList<T>();
bucket.add(pair.left);
buckets.put(size, bucket);
}
return new ArrayList<List<T>>(buckets.values());
}
public static int getEstimatedCompactionsByTasks(ColumnFamilyStore cfs, List<List<SSTableReader>> tasks)
{
int n = 0;
for (List<SSTableReader> bucket : tasks)
{
if (bucket.size() >= cfs.getMinimumCompactionThreshold())
n += Math.ceil((double)bucket.size() / cfs.getMaximumCompactionThreshold());
}
return n;
}
public long getMaxSSTableBytes()
{
return Long.MAX_VALUE;
}
public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException
{
Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options);
uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions);
uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString());
uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString());
return uncheckedOptions;
}
@Override
public boolean shouldDefragment()
{
return true;
}
@Override
public synchronized void addSSTable(SSTableReader added)
{
sstables.add(added);
}
@Override
public synchronized void removeSSTable(SSTableReader sstable)
{
sstables.remove(sstable);
}
public String toString()
{
return String.format("SizeTieredCompactionStrategy[%s/%s]",
cfs.getMinimumCompactionThreshold(),
cfs.getMaximumCompactionThreshold());
}
private static class SplittingCompactionTask extends CompactionTask
{
public SplittingCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int gcBefore)
{
super(cfs, txn, gcBefore);
}
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
LifecycleTransaction txn,
Set<SSTableReader> nonExpiredSSTables)
{
return new SplittingSizeTieredCompactionWriter(cfs, directories, txn, nonExpiredSSTables);
}
}
}