package org.apache.cassandra.db.partitions;
import java.util.function.Predicate;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.Transformation;
public abstract class PurgeFunction extends Transformation<UnfilteredRowIterator>
{
private final boolean isForThrift;
private final DeletionPurger purger;
private final int nowInSec;
private final boolean enforceStrictLiveness;
private boolean isReverseOrder;
public PurgeFunction(boolean isForThrift,
int nowInSec,
int gcBefore,
int oldestUnrepairedTombstone,
boolean onlyPurgeRepairedTombstones,
boolean enforceStrictLiveness)
{
this.isForThrift = isForThrift;
this.nowInSec = nowInSec;
this.purger = (timestamp, localDeletionTime) ->
!(onlyPurgeRepairedTombstones && localDeletionTime >= oldestUnrepairedTombstone)
&& localDeletionTime < gcBefore
&& getPurgeEvaluator().test(timestamp);
this.enforceStrictLiveness = enforceStrictLiveness;
}
protected abstract Predicate<Long> getPurgeEvaluator();
protected void onNewPartition(DecoratedKey partitionKey)
{
}
protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey)
{
}
protected void updateProgress()
{
}
@Override
protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition)
{
onNewPartition(partition.partitionKey());
isReverseOrder = partition.isReverseOrder();
UnfilteredRowIterator purged = Transformation.apply(partition, this);
if (!isForThrift && purged.isEmpty())
{
onEmptyPartitionPostPurge(purged.partitionKey());
purged.close();
return null;
}
return purged;
}
@Override
protected DeletionTime applyToDeletion(DeletionTime deletionTime)
{
return purger.shouldPurge(deletionTime) ? DeletionTime.LIVE : deletionTime;
}
@Override
protected Row applyToStatic(Row row)
{
updateProgress();
return row.purge(purger, nowInSec, enforceStrictLiveness);
}
@Override
protected Row applyToRow(Row row)
{
updateProgress();
return row.purge(purger, nowInSec, enforceStrictLiveness);
}
@Override
protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
updateProgress();
boolean reversed = isReverseOrder;
if (marker.isBoundary())
{
RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker;
boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed));
boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed));
if (shouldPurgeClose)
{
if (shouldPurgeOpen)
return null;
return boundary.createCorrespondingOpenMarker(reversed);
}
return shouldPurgeOpen
? boundary.createCorrespondingCloseMarker(reversed)
: marker;
}
else
{
return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker;
}
}
}