/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db.rows;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
An iterator that merges a source of rows with the range tombstone and partition level deletion of a give partition.
This is used by our Partition
implementations to produce a UnfilteredRowIterator
by merging the rows and deletion infos that are kept separate. This has also 2 additional role: 1) this make sure the row returned only includes the columns selected for the resulting iterator. 2) this (optionally) remove any data that can be shadowed (see commet on 'removeShadowedData' below for more details)
/**
* An iterator that merges a source of rows with the range tombstone and partition level deletion of a give partition.
* <p>
* This is used by our {@code Partition} implementations to produce a {@code UnfilteredRowIterator} by merging the rows
* and deletion infos that are kept separate. This has also 2 additional role:
* 1) this make sure the row returned only includes the columns selected for the resulting iterator.
* 2) this (optionally) remove any data that can be shadowed (see commet on 'removeShadowedData' below for more details)
*/
public class RowAndDeletionMergeIterator extends AbstractUnfilteredRowIterator
{
// For some of our Partition implementation, we can't guarantee that the deletion information (partition level
// deletion and range tombstones) don't shadow data in the rows. If that is the case, this class also take
// cares of skipping such shadowed data (since it is the contract of an UnfilteredRowIterator that it doesn't
// shadow its own data). Sometimes however, we know this can't happen, in which case we can skip that step.
private final boolean removeShadowedData;
private final Comparator<Clusterable> comparator;
private final ColumnFilter selection;
private final Iterator<Row> rows;
private Row nextRow;
private final Iterator<RangeTombstone> ranges;
private RangeTombstone nextRange;
// The currently open tombstone. Note that unless this is null, there is no point in checking nextRange.
private RangeTombstone openRange;
public RowAndDeletionMergeIterator(CFMetaData metadata,
DecoratedKey partitionKey,
DeletionTime partitionLevelDeletion,
ColumnFilter selection,
Row staticRow,
boolean isReversed,
EncodingStats stats,
Iterator<Row> rows,
Iterator<RangeTombstone> ranges,
boolean removeShadowedData)
{
super(metadata, partitionKey, partitionLevelDeletion, selection.fetchedColumns(), staticRow, isReversed, stats);
this.comparator = isReversed ? metadata.comparator.reversed() : metadata.comparator;
this.selection = selection;
this.removeShadowedData = removeShadowedData;
this.rows = rows;
this.ranges = ranges;
}
private Unfiltered computeNextInternal()
{
while (true)
{
updateNextRow();
if (nextRow == null)
{
if (openRange != null)
return closeOpenedRange();
updateNextRange();
return nextRange == null ? endOfData() : openRange();
}
// We have a next row
if (openRange == null)
{
// We have no currently open tombstone range. So check if we have a next range and if it sorts before this row.
// If it does, the opening of that range should go first. Otherwise, the row goes first.
updateNextRange();
if (nextRange != null && comparator.compare(openBound(nextRange), nextRow.clustering()) < 0)
return openRange();
Row row = consumeNextRow();
// it's possible for the row to be fully shadowed by the current range tombstone
if (row != null)
return row;
}
else
{
// We have both a next row and a currently opened tombstone. Check which goes first between the range closing and the row.
if (comparator.compare(closeBound(openRange), nextRow.clustering()) < 0)
return closeOpenedRange();
Row row = consumeNextRow();
if (row != null)
return row;
}
}
}
RangeTombstoneList doesn't correctly merge multiple superseded rts, or overlapping rts with the
same ts. This causes it to emit noop boundary markers which can cause unneeded read repairs and
repair over streaming. This should technically be fixed in RangeTombstoneList. However, fixing
it isn't trivial and that class is already so complicated that the fix would have a good chance
of adding a worse bug. So we just swallow the noop boundary markers here. See CASSANDRA-14894
/**
* RangeTombstoneList doesn't correctly merge multiple superseded rts, or overlapping rts with the
* same ts. This causes it to emit noop boundary markers which can cause unneeded read repairs and
* repair over streaming. This should technically be fixed in RangeTombstoneList. However, fixing
* it isn't trivial and that class is already so complicated that the fix would have a good chance
* of adding a worse bug. So we just swallow the noop boundary markers here. See CASSANDRA-14894
*/
private static boolean shouldSkip(Unfiltered unfiltered)
{
if (unfiltered == null || !unfiltered.isRangeTombstoneMarker())
return false;
RangeTombstoneMarker marker = (RangeTombstoneMarker) unfiltered;
if (!marker.isBoundary())
return false;
DeletionTime open = marker.openDeletionTime(false);
DeletionTime close = marker.closeDeletionTime(false);
return open.equals(close);
}
@Override
protected Unfiltered computeNext()
{
while (true)
{
Unfiltered next = computeNextInternal();
if (shouldSkip(next))
continue;
return next;
}
}
private void updateNextRow()
{
if (nextRow == null && rows.hasNext())
nextRow = rows.next();
}
private void updateNextRange()
{
while (nextRange == null && ranges.hasNext())
{
nextRange = ranges.next();
if (removeShadowedData && partitionLevelDeletion().supersedes(nextRange.deletionTime()))
nextRange = null;
}
}
private Row consumeNextRow()
{
Row row = nextRow;
nextRow = null;
if (!removeShadowedData)
return row.filter(selection, metadata());
DeletionTime activeDeletion = openRange == null ? partitionLevelDeletion() : openRange.deletionTime();
return row.filter(selection, activeDeletion, false, metadata());
}
private RangeTombstone consumeNextRange()
{
RangeTombstone range = nextRange;
nextRange = null;
return range;
}
private RangeTombstone consumeOpenRange()
{
RangeTombstone range = openRange;
openRange = null;
return range;
}
private ClusteringBound openBound(RangeTombstone range)
{
return range.deletedSlice().open(isReverseOrder());
}
private ClusteringBound closeBound(RangeTombstone range)
{
return range.deletedSlice().close(isReverseOrder());
}
private RangeTombstoneMarker closeOpenedRange()
{
// Check if that close if actually a boundary between markers
updateNextRange();
RangeTombstoneMarker marker;
if (nextRange != null && comparator.compare(closeBound(openRange), openBound(nextRange)) == 0)
{
marker = RangeTombstoneBoundaryMarker.makeBoundary(isReverseOrder(), closeBound(openRange), openBound(nextRange), openRange.deletionTime(), nextRange.deletionTime());
openRange = consumeNextRange();
}
else
{
RangeTombstone toClose = consumeOpenRange();
marker = new RangeTombstoneBoundMarker(closeBound(toClose), toClose.deletionTime());
}
return marker;
}
private RangeTombstoneMarker openRange()
{
assert openRange == null && nextRange != null;
openRange = consumeNextRange();
return new RangeTombstoneBoundMarker(openBound(openRange), openRange.deletionTime());
}
}