/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;

import org.apache.cassandra.utils.AbstractIterator;
import com.google.common.collect.Iterators;

import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.memory.AbstractAllocator;

Data structure holding the range tombstones of a ColumnFamily.

This is essentially a sorted list of non-overlapping (tombstone) ranges.

A range tombstone has 4 elements: the start and end of the range covered, and the deletion infos (markedAt timestamp and local deletion time). The markedAt timestamp is what define the priority of 2 overlapping tombstones. That is, given 2 tombstones [0, 10]@t1 and [5, 15]@t2, then if t2 > t1 (and are the tombstones markedAt values), the 2nd tombstone take precedence over the first one on [5, 10]. If such tombstones are added to a RangeTombstoneList, the range tombstone list will store them as [[0, 5]@t1, [5, 15]@t2].

The only use of the local deletion time is to know when a given tombstone can be purged, which will be done by the purge() method.

/** * Data structure holding the range tombstones of a ColumnFamily. * <p> * This is essentially a sorted list of non-overlapping (tombstone) ranges. * <p> * A range tombstone has 4 elements: the start and end of the range covered, * and the deletion infos (markedAt timestamp and local deletion time). The * markedAt timestamp is what define the priority of 2 overlapping tombstones. * That is, given 2 tombstones {@code [0, 10]@t1 and [5, 15]@t2, then if t2 > t1} (and * are the tombstones markedAt values), the 2nd tombstone take precedence over * the first one on [5, 10]. If such tombstones are added to a RangeTombstoneList, * the range tombstone list will store them as [[0, 5]@t1, [5, 15]@t2]. * <p> * The only use of the local deletion time is to know when a given tombstone can * be purged, which will be done by the purge() method. */
public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurableMemory { private static long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneList(null, 0)); private final ClusteringComparator comparator; // Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could // use a List for starts and ends, but having arrays everywhere is almost simpler. private ClusteringBound[] starts; private ClusteringBound[] ends; private long[] markedAts; private int[] delTimes; private long boundaryHeapSize; private int size; private RangeTombstoneList(ClusteringComparator comparator, ClusteringBound[] starts, ClusteringBound[] ends, long[] markedAts, int[] delTimes, long boundaryHeapSize, int size) { assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length; this.comparator = comparator; this.starts = starts; this.ends = ends; this.markedAts = markedAts; this.delTimes = delTimes; this.size = size; this.boundaryHeapSize = boundaryHeapSize; } public RangeTombstoneList(ClusteringComparator comparator, int capacity) { this(comparator, new ClusteringBound[capacity], new ClusteringBound[capacity], new long[capacity], new int[capacity], 0, 0); } public boolean isEmpty() { return size == 0; } public int size() { return size; } public ClusteringComparator comparator() { return comparator; } public RangeTombstoneList copy() { return new RangeTombstoneList(comparator, Arrays.copyOf(starts, size), Arrays.copyOf(ends, size), Arrays.copyOf(markedAts, size), Arrays.copyOf(delTimes, size), boundaryHeapSize, size); } public RangeTombstoneList copy(AbstractAllocator allocator) { RangeTombstoneList copy = new RangeTombstoneList(comparator, new ClusteringBound[size], new ClusteringBound[size], Arrays.copyOf(markedAts, size), Arrays.copyOf(delTimes, size), boundaryHeapSize, size); for (int i = 0; i < size; i++) { copy.starts[i] = clone(starts[i], allocator); copy.ends[i] = clone(ends[i], allocator); } return copy; } private static ClusteringBound clone(ClusteringBound bound, AbstractAllocator allocator) { ByteBuffer[] values = new ByteBuffer[bound.size()]; for (int i = 0; i < values.length; i++) values[i] = allocator.clone(bound.get(i)); return new ClusteringBound(bound.kind(), values); } public void add(RangeTombstone tombstone) { add(tombstone.deletedSlice().start(), tombstone.deletedSlice().end(), tombstone.deletionTime().markedForDeleteAt(), tombstone.deletionTime().localDeletionTime()); }
Adds a new range tombstone. This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case), but it doesn't assume it.
/** * Adds a new range tombstone. * * This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case), * but it doesn't assume it. */
public void add(ClusteringBound start, ClusteringBound end, long markedAt, int delTime) { if (isEmpty()) { addInternal(0, start, end, markedAt, delTime); return; } int c = comparator.compare(ends[size-1], start); // Fast path if we add in sorted order if (c <= 0) { addInternal(size, start, end, markedAt, delTime); } else { // Note: insertFrom expect i to be the insertion point in term of interval ends int pos = Arrays.binarySearch(ends, 0, size, start, comparator); insertFrom((pos >= 0 ? pos+1 : -pos-1), start, end, markedAt, delTime); } boundaryHeapSize += start.unsharedHeapSize() + end.unsharedHeapSize(); }
Adds all the range tombstones of tombstones to this RangeTombstoneList.
/** * Adds all the range tombstones of {@code tombstones} to this RangeTombstoneList. */
public void addAll(RangeTombstoneList tombstones) { if (tombstones.isEmpty()) return; if (isEmpty()) { copyArrays(tombstones, this); return; } /* * We basically have 2 techniques we can use here: either we repeatedly call add() on tombstones values, * or we do a merge of both (sorted) lists. If this lists is bigger enough than the one we add, then * calling add() will be faster, otherwise it's merging that will be faster. * * Let's note that during memtables updates, it might not be uncommon that a new update has only a few range * tombstones, while the CF we're adding it to (the one in the memtable) has many. In that case, using add() is * likely going to be faster. * * In other cases however, like when diffing responses from multiple nodes, the tombstone lists we "merge" will * be likely sized, so using add() might be a bit inefficient. * * Roughly speaking (this ignore the fact that updating an element is not exactly constant but that's not a big * deal), if n is the size of this list and m is tombstones size, merging is O(n+m) while using add() is O(m*log(n)). * * But let's not crank up a logarithm computation for that. Long story short, merging will be a bad choice only * if this list size is lot bigger that the other one, so let's keep it simple. */ if (size > 10 * tombstones.size) { for (int i = 0; i < tombstones.size; i++) add(tombstones.starts[i], tombstones.ends[i], tombstones.markedAts[i], tombstones.delTimes[i]); } else { int i = 0; int j = 0; while (i < size && j < tombstones.size) { if (comparator.compare(tombstones.starts[j], ends[i]) < 0) { insertFrom(i, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]); j++; } else { i++; } } // Addds the remaining ones from tombstones if any (note that addInternal will increment size if relevant). for (; j < tombstones.size; j++) addInternal(size, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]); } }
Returns whether the given name/timestamp pair is deleted by one of the tombstone of this RangeTombstoneList.
/** * Returns whether the given name/timestamp pair is deleted by one of the tombstone * of this RangeTombstoneList. */
public boolean isDeleted(Clustering clustering, Cell cell) { int idx = searchInternal(clustering, 0, size); // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. return idx >= 0 && (cell.isCounterCell() || markedAts[idx] >= cell.timestamp()); }
Returns the DeletionTime for the tombstone overlapping name (there can't be more than one), or null if name is not covered by any tombstone.
/** * Returns the DeletionTime for the tombstone overlapping {@code name} (there can't be more than one), * or null if {@code name} is not covered by any tombstone. */
public DeletionTime searchDeletionTime(Clustering name) { int idx = searchInternal(name, 0, size); return idx < 0 ? null : new DeletionTime(markedAts[idx], delTimes[idx]); } public RangeTombstone search(Clustering name) { int idx = searchInternal(name, 0, size); return idx < 0 ? null : rangeTombstone(idx); } /* * Return is the index of the range covering name if name is covered. If the return idx is negative, * no range cover name and -idx-1 is the index of the first range whose start is greater than name. * * Note that bounds are not in the range if they fall on its boundary. */ private int searchInternal(ClusteringPrefix name, int startIdx, int endIdx) { if (isEmpty()) return -1; int pos = Arrays.binarySearch(starts, startIdx, endIdx, name, comparator); if (pos >= 0) { // Equality only happens for bounds (as used by forward/reverseIterator), and bounds are equal only if they // are the same or complementary, in either case the bound itself is not part of the range. return -pos - 1; } else { // We potentially intersect the range before our "insertion point" int idx = -pos-2; if (idx < 0) return -1; return comparator.compare(name, ends[idx]) < 0 ? idx : -idx-2; } } public int dataSize() { int dataSize = TypeSizes.sizeof(size); for (int i = 0; i < size; i++) { dataSize += starts[i].dataSize() + ends[i].dataSize(); dataSize += TypeSizes.sizeof(markedAts[i]); dataSize += TypeSizes.sizeof(delTimes[i]); } return dataSize; } public long maxMarkedAt() { long max = Long.MIN_VALUE; for (int i = 0; i < size; i++) max = Math.max(max, markedAts[i]); return max; } public void collectStats(EncodingStats.Collector collector) { for (int i = 0; i < size; i++) { collector.updateTimestamp(markedAts[i]); collector.updateLocalDeletionTime(delTimes[i]); } } public void updateAllTimestamp(long timestamp) { for (int i = 0; i < size; i++) markedAts[i] = timestamp; } private RangeTombstone rangeTombstone(int idx) { return new RangeTombstone(Slice.make(starts[idx], ends[idx]), new DeletionTime(markedAts[idx], delTimes[idx])); } private RangeTombstone rangeTombstoneWithNewStart(int idx, ClusteringBound newStart) { return new RangeTombstone(Slice.make(newStart, ends[idx]), new DeletionTime(markedAts[idx], delTimes[idx])); } private RangeTombstone rangeTombstoneWithNewEnd(int idx, ClusteringBound newEnd) { return new RangeTombstone(Slice.make(starts[idx], newEnd), new DeletionTime(markedAts[idx], delTimes[idx])); } private RangeTombstone rangeTombstoneWithNewBounds(int idx, ClusteringBound newStart, ClusteringBound newEnd) { return new RangeTombstone(Slice.make(newStart, newEnd), new DeletionTime(markedAts[idx], delTimes[idx])); } public Iterator<RangeTombstone> iterator() { return iterator(false); } public Iterator<RangeTombstone> iterator(boolean reversed) { return reversed ? new AbstractIterator<RangeTombstone>() { private int idx = size - 1; protected RangeTombstone computeNext() { if (idx < 0) return endOfData(); return rangeTombstone(idx--); } } : new AbstractIterator<RangeTombstone>() { private int idx; protected RangeTombstone computeNext() { if (idx >= size) return endOfData(); return rangeTombstone(idx++); } }; } public Iterator<RangeTombstone> iterator(final Slice slice, boolean reversed) { return reversed ? reverseIterator(slice) : forwardIterator(slice); } private Iterator<RangeTombstone> forwardIterator(final Slice slice) { int startIdx = slice.start() == ClusteringBound.BOTTOM ? 0 : searchInternal(slice.start(), 0, size); final int start = startIdx < 0 ? -startIdx-1 : startIdx; if (start >= size) return Collections.emptyIterator(); int finishIdx = slice.end() == ClusteringBound.TOP ? size - 1 : searchInternal(slice.end(), start, size); // if stopIdx is the first range after 'slice.end()' we care only until the previous range final int finish = finishIdx < 0 ? -finishIdx-2 : finishIdx; if (start > finish) return Collections.emptyIterator(); if (start == finish) { // We want to make sure the range are stricly included within the queried slice as this // make it easier to combine things when iterating over successive slices. ClusteringBound s = comparator.compare(starts[start], slice.start()) < 0 ? slice.start() : starts[start]; ClusteringBound e = comparator.compare(slice.end(), ends[start]) < 0 ? slice.end() : ends[start]; return Iterators.<RangeTombstone>singletonIterator(rangeTombstoneWithNewBounds(start, s, e)); } return new AbstractIterator<RangeTombstone>() { private int idx = start; protected RangeTombstone computeNext() { if (idx >= size || idx > finish) return endOfData(); // We want to make sure the range are stricly included within the queried slice as this // make it easier to combine things when iterating over successive slices. This means that // for the first and last range we might have to "cut" the range returned. if (idx == start && comparator.compare(starts[idx], slice.start()) < 0) return rangeTombstoneWithNewStart(idx++, slice.start()); if (idx == finish && comparator.compare(slice.end(), ends[idx]) < 0) return rangeTombstoneWithNewEnd(idx++, slice.end()); return rangeTombstone(idx++); } }; } private Iterator<RangeTombstone> reverseIterator(final Slice slice) { int startIdx = slice.end() == ClusteringBound.TOP ? size - 1 : searchInternal(slice.end(), 0, size); // if startIdx is the first range after 'slice.end()' we care only until the previous range final int start = startIdx < 0 ? -startIdx-2 : startIdx; if (start < 0) return Collections.emptyIterator(); int finishIdx = slice.start() == ClusteringBound.BOTTOM ? 0 : searchInternal(slice.start(), 0, start + 1); // include same as finish // if stopIdx is the first range after 'slice.end()' we care only until the previous range final int finish = finishIdx < 0 ? -finishIdx-1 : finishIdx; if (start < finish) return Collections.emptyIterator(); if (start == finish) { // We want to make sure the range are stricly included within the queried slice as this // make it easier to combine things when iterator over successive slices. ClusteringBound s = comparator.compare(starts[start], slice.start()) < 0 ? slice.start() : starts[start]; ClusteringBound e = comparator.compare(slice.end(), ends[start]) < 0 ? slice.end() : ends[start]; return Iterators.<RangeTombstone>singletonIterator(rangeTombstoneWithNewBounds(start, s, e)); } return new AbstractIterator<RangeTombstone>() { private int idx = start; protected RangeTombstone computeNext() { if (idx < 0 || idx < finish) return endOfData(); // We want to make sure the range are stricly included within the queried slice as this // make it easier to combine things when iterator over successive slices. This means that // for the first and last range we might have to "cut" the range returned. if (idx == start && comparator.compare(slice.end(), ends[idx]) < 0) return rangeTombstoneWithNewEnd(idx--, slice.end()); if (idx == finish && comparator.compare(starts[idx], slice.start()) < 0) return rangeTombstoneWithNewStart(idx--, slice.start()); return rangeTombstone(idx--); } }; } @Override public boolean equals(Object o) { if(!(o instanceof RangeTombstoneList)) return false; RangeTombstoneList that = (RangeTombstoneList)o; if (size != that.size) return false; for (int i = 0; i < size; i++) { if (!starts[i].equals(that.starts[i])) return false; if (!ends[i].equals(that.ends[i])) return false; if (markedAts[i] != that.markedAts[i]) return false; if (delTimes[i] != that.delTimes[i]) return false; } return true; } @Override public final int hashCode() { int result = size; for (int i = 0; i < size; i++) { result += starts[i].hashCode() + ends[i].hashCode(); result += (int)(markedAts[i] ^ (markedAts[i] >>> 32)); result += delTimes[i]; } return result; } private static void copyArrays(RangeTombstoneList src, RangeTombstoneList dst) { dst.grow(src.size); System.arraycopy(src.starts, 0, dst.starts, 0, src.size); System.arraycopy(src.ends, 0, dst.ends, 0, src.size); System.arraycopy(src.markedAts, 0, dst.markedAts, 0, src.size); System.arraycopy(src.delTimes, 0, dst.delTimes, 0, src.size); dst.size = src.size; dst.boundaryHeapSize = src.boundaryHeapSize; } /* * Inserts a new element starting at index i. This method assumes that: * ends[i-1] <= start < ends[i] * (note that start can be equal to ends[i-1] in the case where we have a boundary, i.e. for instance * ends[i-1] is the exclusive end of X and start is the inclusive start of X). * * A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that: * - s_i is a start bound and e_i is a end bound * - s_i < e_i * - e_i <= s_i+1 * Basically, range are non overlapping and in order. */ private void insertFrom(int i, ClusteringBound start, ClusteringBound end, long markedAt, int delTime) { while (i < size) { assert start.isStart() && end.isEnd(); assert i == 0 || comparator.compare(ends[i-1], start) <= 0; assert comparator.compare(start, ends[i]) < 0; if (Slice.isEmpty(comparator, start, end)) return; // Do we overwrite the current element? if (markedAt > markedAts[i]) { // We do overwrite. // First deal with what might come before the newly added one. if (comparator.compare(starts[i], start) < 0) { ClusteringBound newEnd = start.invert(); if (!Slice.isEmpty(comparator, starts[i], newEnd)) { addInternal(i, starts[i], newEnd, markedAts[i], delTimes[i]); i++; setInternal(i, start, ends[i], markedAts[i], delTimes[i]); } } // now, start <= starts[i] // Does the new element stops before the current one, int endCmp = comparator.compare(end, starts[i]); if (endCmp < 0) { // Here start <= starts[i] and end < starts[i] // This means the current element is before the current one. addInternal(i, start, end, markedAt, delTime); return; } // Do we overwrite the current element fully? int cmp = comparator.compare(ends[i], end); if (cmp <= 0) { // We do overwrite fully: // update the current element until it's end and continue on with the next element (with the new inserted start == current end). // If we're on the last element, or if we stop before the next start, we set the current element and are done // Note that the comparison below is inclusive: if a end equals a start, this means they form a boundary, or // in other words that they are for the same element but one is inclusive while the other exclusive. In which case we know // we're good with the next element if (i == size-1 || comparator.compare(end, starts[i+1]) <= 0) { setInternal(i, start, end, markedAt, delTime); return; } setInternal(i, start, starts[i+1].invert(), markedAt, delTime); start = starts[i+1]; i++; } else { // We don't overwrite fully. Insert the new interval, and then update the now next // one to reflect the not overwritten parts. We're then done. addInternal(i, start, end, markedAt, delTime); i++; ClusteringBound newStart = end.invert(); if (!Slice.isEmpty(comparator, newStart, ends[i])) { setInternal(i, newStart, ends[i], markedAts[i], delTimes[i]); } return; } } else { // we don't overwrite the current element // If the new interval starts before the current one, insert that new interval if (comparator.compare(start, starts[i]) < 0) { // If we stop before the start of the current element, just insert the new interval and we're done; // otherwise insert until the beginning of the current element if (comparator.compare(end, starts[i]) <= 0) { addInternal(i, start, end, markedAt, delTime); return; } ClusteringBound newEnd = starts[i].invert(); if (!Slice.isEmpty(comparator, start, newEnd)) { addInternal(i, start, newEnd, markedAt, delTime); i++; } } // After that, we're overwritten on the current element but might have // some residual parts after ... // ... unless we don't extend beyond it. if (comparator.compare(end, ends[i]) <= 0) return; start = ends[i].invert(); i++; } } // If we got there, then just insert the remainder at the end addInternal(i, start, end, markedAt, delTime); } private int capacity() { return starts.length; } /* * Adds the new tombstone at index i, growing and/or moving elements to make room for it. */ private void addInternal(int i, ClusteringBound start, ClusteringBound end, long markedAt, int delTime) { assert i >= 0; if (size == capacity()) growToFree(i); else if (i < size) moveElements(i); setInternal(i, start, end, markedAt, delTime); size++; } /* * Grow the arrays, leaving index i "free" in the process. */ private void growToFree(int i) { int newLength = (capacity() * 3) / 2 + 1; grow(i, newLength); } /* * Grow the arrays to match newLength capacity. */ private void grow(int newLength) { if (capacity() < newLength) grow(-1, newLength); } private void grow(int i, int newLength) { starts = grow(starts, size, newLength, i); ends = grow(ends, size, newLength, i); markedAts = grow(markedAts, size, newLength, i); delTimes = grow(delTimes, size, newLength, i); } private static ClusteringBound[] grow(ClusteringBound[] a, int size, int newLength, int i) { if (i < 0 || i >= size) return Arrays.copyOf(a, newLength); ClusteringBound[] newA = new ClusteringBound[newLength]; System.arraycopy(a, 0, newA, 0, i); System.arraycopy(a, i, newA, i+1, size - i); return newA; } private static long[] grow(long[] a, int size, int newLength, int i) { if (i < 0 || i >= size) return Arrays.copyOf(a, newLength); long[] newA = new long[newLength]; System.arraycopy(a, 0, newA, 0, i); System.arraycopy(a, i, newA, i+1, size - i); return newA; } private static int[] grow(int[] a, int size, int newLength, int i) { if (i < 0 || i >= size) return Arrays.copyOf(a, newLength); int[] newA = new int[newLength]; System.arraycopy(a, 0, newA, 0, i); System.arraycopy(a, i, newA, i+1, size - i); return newA; } /* * Move elements so that index i is "free", assuming the arrays have at least one free slot at the end. */ private void moveElements(int i) { if (i >= size) return; System.arraycopy(starts, i, starts, i+1, size - i); System.arraycopy(ends, i, ends, i+1, size - i); System.arraycopy(markedAts, i, markedAts, i+1, size - i); System.arraycopy(delTimes, i, delTimes, i+1, size - i); // we set starts[i] to null to indicate the position is now empty, so that we update boundaryHeapSize // when we set it starts[i] = null; } private void setInternal(int i, ClusteringBound start, ClusteringBound end, long markedAt, int delTime) { if (starts[i] != null) boundaryHeapSize -= starts[i].unsharedHeapSize() + ends[i].unsharedHeapSize(); starts[i] = start; ends[i] = end; markedAts[i] = markedAt; delTimes[i] = delTime; boundaryHeapSize += start.unsharedHeapSize() + end.unsharedHeapSize(); } @Override public long unsharedHeapSize() { return EMPTY_SIZE + boundaryHeapSize + ObjectSizes.sizeOfArray(starts) + ObjectSizes.sizeOfArray(ends) + ObjectSizes.sizeOfArray(markedAts) + ObjectSizes.sizeOfArray(delTimes); } }