/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.columniterator;

import java.io.IOException;
import java.util.*;

import com.google.common.base.Verify;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileHandle;
import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.utils.btree.BTree;

A Cell Iterator in reversed clustering order over SSTable
/** * A Cell Iterator in reversed clustering order over SSTable */
public class SSTableReversedIterator extends AbstractSSTableIterator {
The index of the slice being processed.
/** * The index of the slice being processed. */
private int slice; public SSTableReversedIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter columns, boolean isForThrift, FileHandle ifile) { super(sstable, file, key, indexEntry, slices, columns, isForThrift, ifile); } protected Reader createReaderInternal(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) { return indexEntry.isIndexed() ? new ReverseIndexedReader(indexEntry, file, shouldCloseFile) : new ReverseReader(file, shouldCloseFile); } public boolean isReverseOrder() { return true; } protected int nextSliceIndex() { int next = slice; slice++; return slices.size() - (next + 1); } protected boolean hasMoreSlices() { return slice < slices.size(); } private class ReverseReader extends Reader { protected ReusablePartitionData buffer; protected Iterator<Unfiltered> iterator; // Set in loadFromDisk () and used in setIterator to handle range tombstone extending on multiple index block. See // loadFromDisk for details. Note that those are always false for non-indexed readers. protected boolean skipFirstIteratedItem; protected boolean skipLastIteratedItem; protected Unfiltered mostRecentlyEmitted = null; private ReverseReader(FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); } protected ReusablePartitionData createBuffer(int blocksCount) { int estimatedRowCount = 16; int columnCount = metadata().partitionColumns().regulars.size(); if (columnCount == 0 || metadata().clusteringColumns().isEmpty()) { estimatedRowCount = 1; } else { try { // To avoid wasted resizing we guess-estimate the number of rows we're likely to read. For that // we use the stats on the number of rows per partition for that sstable. // FIXME: so far we only keep stats on cells, so to get a rough estimate on the number of rows, // we divide by the number of regular columns the table has. We should fix once we collect the // stats on rows int estimatedRowsPerPartition = (int)(sstable.getEstimatedColumnCount().percentile(0.75) / columnCount); estimatedRowCount = Math.max(estimatedRowsPerPartition / blocksCount, 1); } catch (IllegalStateException e) { // The EstimatedHistogram mean() method can throw this (if it overflows). While such overflow // shouldn't happen, it's not worth taking the risk of letting the exception bubble up. } } return new ReusablePartitionData(metadata(), partitionKey(), columns(), estimatedRowCount); } public void setForSlice(Slice slice) throws IOException { // If we have read the data, just create the iterator for the slice. Otherwise, read the data. if (buffer == null) { buffer = createBuffer(1); // Note that we can reuse that buffer between slices (we could alternatively re-read from disk // every time, but that feels more wasteful) so we want to include everything from the beginning. // We can stop at the slice end however since any following slice will be before that. loadFromDisk(null, slice.end(), false, false, null, null); } setIterator(slice); } protected void setIterator(Slice slice) { assert buffer != null; iterator = buffer.built.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); if (!iterator.hasNext()) return; if (skipFirstIteratedItem) iterator.next(); if (skipLastIteratedItem) iterator = new SkipLastIterator(iterator); } protected boolean hasNextInternal() throws IOException { // If we've never called setForSlice, we're reading everything if (iterator == null) setForSlice(Slice.ALL); return iterator.hasNext(); } protected Unfiltered nextInternal() throws IOException { if (!hasNext()) throw new NoSuchElementException(); Unfiltered next = iterator.next(); mostRecentlyEmitted = next; return next; } protected boolean stopReadingDisk() throws IOException { return false; } // checks if left prefix precedes right prefix private boolean precedes(ClusteringPrefix left, ClusteringPrefix right) { return metadata().comparator.compare(left, right) < 0; } // Reads the unfiltered from disk and load them into the reader buffer. It stops reading when either the partition // is fully read, or when stopReadingDisk() returns true. protected void loadFromDisk(ClusteringBound start, ClusteringBound end, boolean hasPreviousBlock, boolean hasNextBlock, ClusteringPrefix currentFirstName, ClusteringPrefix nextLastName) throws IOException { // start != null means it's the block covering the beginning of the slice, so it has to be the last block for this slice. assert start == null || !hasNextBlock; buffer.reset(); skipFirstIteratedItem = false; skipLastIteratedItem = false; boolean isFirst = true; // If the start might be in this block, skip everything that comes before it. if (start != null) { while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0 && !stopReadingDisk()) { isFirst = false; if (deserializer.nextIsRow()) deserializer.skipNext(); else updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); } } // If we have an open marker, it's either one from what we just skipped or it's one that open in the next (or // one of the next) index block (if openMarker == openMarkerAtStartOfBlock). if (openMarker != null) { // We have to feed a marker to the buffer, because that marker is likely to be close later and ImmtableBTreePartition // doesn't take kindly to marker that comes without their counterpart. If that's the last block we're gonna read (for // the current slice at least) it's easy because we'll want to return that open marker at the end of the data in this // block anyway, so we have nothing more to do than adding it to the buffer. // If it's not the last block however, in which case we know we'll have start == null, it means this marker is really // open in a next block and so while we do need to add it the buffer for the reason mentioned above, we don't // want to "return" it just yet, we'll wait until we reach it in the next blocks. That's why we trigger // skipLastIteratedItem in that case (this is first item of the block, but we're iterating in reverse order // so it will be last returned by the iterator). ClusteringBound markerStart = start == null ? ClusteringBound.BOTTOM : start; buffer.add(new RangeTombstoneBoundMarker(markerStart, openMarker)); if (hasNextBlock) skipLastIteratedItem = true; } // Now deserialize everything until we reach our requested end (if we have one) // See SSTableIterator.ForwardRead.computeNext() for why this is a strict inequality below: this is the same // reasoning here. while (deserializer.hasNext() && (end == null || deserializer.compareNextTo(end) < 0) && !stopReadingDisk()) { Unfiltered unfiltered = deserializer.readNext(); if (isFirst && openMarker == null && currentFirstName != null && nextLastName != null && (precedes(currentFirstName, nextLastName) || precedes(unfiltered.clustering(), currentFirstName))) { // Range tombstones spanning multiple index blocks when reading legacy sstables need special handling. // Pre-3.0, the column index didn't encode open markers. Instead, open range tombstones were rewritten // at the start of index blocks they at least partially covered. These rewritten RTs found at the // beginning of index blocks need to be handled as though they were an open marker, otherwise iterator // validation will fail and/or some rows will be excluded from the result. These rewritten RTs can be // detected based on their relation to the current index block and the next one depending on what wrote // the sstable. For sstables coming from a memtable flush, a rewritten RT will have a clustering value // less than the first name of its index block. For sstables coming from compaction, the index block // first name will be the RT open bound, which will be less than the last name of the next block. So, // here we compare the first name of this block to the last name of the next block to detect the // compaction case, and clustering value of the unfiltered we just read to the index block's first name // to detect the flush case. Verify.verify(!sstable.descriptor.version.storeRows()); Verify.verify(openMarker == null); Verify.verify(!skipLastIteratedItem); Verify.verify(unfiltered.isRangeTombstoneMarker()); buffer.add(unfiltered); if (hasNextBlock) skipLastIteratedItem = true; } else if (isFirst && nextLastName != null && !precedes(nextLastName, unfiltered.clustering())) { // When dealing with old format sstable, we have the problem that a row can span 2 index block, i.e. it can // start at the end of a block and end at the beginning of the next one. That's not a problem per se for // UnfilteredDeserializer.OldFormatSerializer, since it always read rows entirely, even if they span index // blocks, but as we reading index block in reverse we must be careful to not read the end of the row at // beginning of a block before we're reading the beginning of that row. So what we do is that if we detect // that the row starting this block is also the row ending the next one we're read (previous on disk), then // we'll skip that first result and let it be read with the next block. Verify.verify(!sstable.descriptor.version.storeRows()); isFirst = false; } else if (unfiltered.isEmpty()) { isFirst = false; } else { buffer.add(unfiltered); isFirst = false; } if (unfiltered.isRangeTombstoneMarker()) updateOpenMarker((RangeTombstoneMarker)unfiltered); } if (!sstable.descriptor.version.storeRows() && deserializer.hasNext() && (end == null || deserializer.compareNextTo(end) < 0)) { // Range tombstone start and end bounds are stored together in legacy sstables. When we read one, we // stash the closing bound until we reach the appropriate place to emit it, which is immediately before // the next unfiltered with a greater clustering. // If SSTRI considers the block exhausted before encountering such a clustering though, this end marker // will never be emitted. So here we just check if there's a closing bound left in the deserializer. // If there is, we compare it against the most recently emitted unfiltered (i.e.: the last unfiltered // that this RT would enclose. And we have to do THAT comparison because the last name field on the // current index block will be whatever was written at the end of the index block (i.e. the last name // physically in the block), not the closing bound of the range tombstone (i.e. the last name logically // in the block). If all this indicates that there is indeed a range tombstone we're missing, we add it // to the buffer and update the open marker field. Unfiltered unfiltered = deserializer.readNext(); RangeTombstoneMarker marker = unfiltered.isRangeTombstoneMarker() ? (RangeTombstoneMarker) unfiltered : null; if (marker != null && marker.isClose(false) && (mostRecentlyEmitted == null || precedes(marker.clustering(), mostRecentlyEmitted.clustering()))) { buffer.add(marker); updateOpenMarker(marker); } } // If we have an open marker, we should close it before finishing if (openMarker != null) { // This is the reverse problem than the one at the start of the block. Namely, if it's the first block // we deserialize for the slice (the one covering the slice end basically), then it's easy, we just want // to add the close marker to the buffer and return it normally. // If it's note our first block (for the slice) however, it means that marker closed in a previously read // block and we have already returned it. So while we should still add it to the buffer for the sake of // not breaking ImmutableBTreePartition, we should skip it when returning from the iterator, hence the // skipFirstIteratedItem (this is the last item of the block, but we're iterating in reverse order so it will // be the first returned by the iterator). ClusteringBound markerEnd = end == null ? ClusteringBound.TOP : end; buffer.add(new RangeTombstoneBoundMarker(markerEnd, openMarker)); if (hasPreviousBlock) skipFirstIteratedItem = true; } buffer.build(); } } private class ReverseIndexedReader extends ReverseReader { private final IndexState indexState; // The slice we're currently iterating over private Slice slice; // The last index block to consider for the slice private int lastBlockIdx; private ReverseIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, true, ifile); } @Override public void close() throws IOException { super.close(); this.indexState.close(); } @Override public void setForSlice(Slice slice) throws IOException { this.slice = slice; // if our previous slicing already got us past the beginning of the sstable, we're done if (indexState.isDone()) { iterator = Collections.emptyIterator(); return; } // Find the first index block we'll need to read for the slice. int startIdx = indexState.findBlockIndex(slice.end(), indexState.currentBlockIdx()); if (startIdx < 0) { iterator = Collections.emptyIterator(); indexState.setToBlock(startIdx); return; } lastBlockIdx = indexState.findBlockIndex(slice.start(), startIdx); // If the last block to look (in reverse order) is after the very last block, we have nothing for that slice if (lastBlockIdx >= indexState.blocksCount()) { assert startIdx >= indexState.blocksCount(); iterator = Collections.emptyIterator(); return; } // If we start (in reverse order) after the very last block, just read from the last one. if (startIdx >= indexState.blocksCount()) startIdx = indexState.blocksCount() - 1; // Note that even if we were already set on the proper block (which would happen if the previous slice // requested ended on the same block this one start), we can't reuse it because when reading the previous // slice we've only read that block from the previous slice start. Re-reading also handles // skipFirstIteratedItem/skipLastIteratedItem that we would need to handle otherwise. indexState.setToBlock(startIdx); readCurrentBlock(false, startIdx != lastBlockIdx); } @Override protected boolean hasNextInternal() throws IOException { if (super.hasNextInternal()) return true; while (true) { // We have nothing more for our current block, move the next one (so the one before on disk). int nextBlockIdx = indexState.currentBlockIdx() - 1; if (nextBlockIdx < 0 || nextBlockIdx < lastBlockIdx) return false; // The slice start can be in indexState.setToBlock(nextBlockIdx); readCurrentBlock(true, nextBlockIdx != lastBlockIdx); // If an indexed block only contains data for a dropped column, the iterator will be empty, even // though we may still have data to read in subsequent blocks // also, for pre-3.0 storage formats, index blocks that only contain a single row and that row crosses // index boundaries, the iterator will be empty even though we haven't read everything we're intending // to read. In that case, we want to read the next index block. This shouldn't be possible in 3.0+ // formats (see next comment) if (!iterator.hasNext() && nextBlockIdx > lastBlockIdx) { continue; } return iterator.hasNext(); } }
Reads the current block, the last one we've set.
Params:
  • hasPreviousBlock – is whether we have already read a previous block for the current slice.
  • hasNextBlock – is whether we have more blocks to read for the current slice.
/** * Reads the current block, the last one we've set. * * @param hasPreviousBlock is whether we have already read a previous block for the current slice. * @param hasNextBlock is whether we have more blocks to read for the current slice. */
private void readCurrentBlock(boolean hasPreviousBlock, boolean hasNextBlock) throws IOException { if (buffer == null) buffer = createBuffer(indexState.blocksCount()); int currentBlock = indexState.currentBlockIdx(); // The slice start (resp. slice end) is only meaningful on the last (resp. first) block read (since again, // we read blocks in reverse order). boolean canIncludeSliceStart = !hasNextBlock; boolean canIncludeSliceEnd = !hasPreviousBlock; ClusteringPrefix currentFirstName = null; ClusteringPrefix nextLastName = null; if (!sstable.descriptor.version.storeRows() && currentBlock > 0) { currentFirstName = indexState.index(currentBlock).firstName; nextLastName = indexState.index(currentBlock - 1).lastName; } loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null, hasPreviousBlock, hasNextBlock, currentFirstName, nextLastName ); setIterator(slice); } @Override protected boolean stopReadingDisk() throws IOException { return indexState.isPastCurrentBlock(); } } private class ReusablePartitionData { private final CFMetaData metadata; private final DecoratedKey partitionKey; private final PartitionColumns columns; private MutableDeletionInfo.Builder deletionBuilder; private MutableDeletionInfo deletionInfo; private BTree.Builder<Row> rowBuilder; private ImmutableBTreePartition built; private ReusablePartitionData(CFMetaData metadata, DecoratedKey partitionKey, PartitionColumns columns, int initialRowCapacity) { this.metadata = metadata; this.partitionKey = partitionKey; this.columns = columns; this.rowBuilder = BTree.builder(metadata.comparator, initialRowCapacity); } public void add(Unfiltered unfiltered) { if (unfiltered.isRow()) rowBuilder.add((Row)unfiltered); else deletionBuilder.add((RangeTombstoneMarker)unfiltered); } public void reset() { built = null; rowBuilder.reuse(); deletionBuilder = MutableDeletionInfo.builder(partitionLevelDeletion, metadata().comparator, false); } public void build() { deletionInfo = deletionBuilder.build(); built = new ImmutableBTreePartition(metadata, partitionKey, columns, Rows.EMPTY_STATIC_ROW, rowBuilder.build(), deletionInfo, EncodingStats.NO_STATS); deletionBuilder = null; } } private static class SkipLastIterator extends AbstractIterator<Unfiltered> { private final Iterator<Unfiltered> iterator; private SkipLastIterator(Iterator<Unfiltered> iterator) { this.iterator = iterator; } protected Unfiltered computeNext() { if (!iterator.hasNext()) return endOfData(); Unfiltered next = iterator.next(); return iterator.hasNext() ? next : endOfData(); } } }