/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.utils;

import java.util.*;

Merges sorted input iterators which individually contain unique items.
/** Merges sorted input iterators which individually contain unique items. */
public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements IMergeIterator<In, Out> { protected final Reducer<In,Out> reducer; protected final List<? extends Iterator<In>> iterators; protected MergeIterator(List<? extends Iterator<In>> iters, Reducer<In, Out> reducer) { this.iterators = iters; this.reducer = reducer; } public static <In, Out> MergeIterator<In, Out> get(List<? extends Iterator<In>> sources, Comparator<? super In> comparator, Reducer<In, Out> reducer) { if (sources.size() == 1) { return reducer.trivialReduceIsTrivial() ? new TrivialOneToOne<>(sources, reducer) : new OneToOne<>(sources, reducer); } return new ManyToOne<>(sources, comparator, reducer); } public Iterable<? extends Iterator<In>> iterators() { return iterators; } public void close() { for (Iterator<In> iterator : this.iterators) { try { if (iterator instanceof AutoCloseable) ((AutoCloseable)iterator).close(); } catch (Exception e) { throw new RuntimeException(e); } } reducer.close(); }
A MergeIterator that consumes multiple input values per output value. The most straightforward way to implement this is to use a PriorityQueue of iterators, poll it to find the next item to consume, then add the iterator back after advancing. This is not very efficient as poll and add in all cases require at least log(size) comparisons (usually more than 2*log(size)) per consumed item, even if the input is suitable for fast iteration. The implementation below makes use of the fact that replacing the top element in a binary heap can be done much more efficiently than separately removing it and placing it back, especially in the cases where the top iterator is to be used again very soon (e.g. when there are large sections of the output where only a limited number of input iterators overlap, which is normally the case in many practically useful situations, e.g. levelled compaction). To further improve this particular scenario, we also use a short sorted section at the start of the queue. The heap is laid out as this (for SORTED_SECTION_SIZE == 2): 0 | 1 | 2 / \ 3 4 / \ / \ 5 6 7 8 .. .. .. .. Where each line is a <= relationship. In the sorted section we can advance with a single comparison per level, while advancing a level within the heap requires two (so that we can find the lighter element to pop up). The sorted section adds a constant overhead when data is uniformly distributed among the iterators, but may up to halve the iteration time when one iterator is dominant over sections of the merged data (as is the case with non-overlapping iterators). The iterator is further complicated by the need to avoid advancing the input iterators until an output is actually requested. To achieve this consume walks the heap to find equal items without advancing the iterators, and advance moves them and restores the heap structure before any items can be consumed. To avoid having to do additional comparisons in consume to identify the equal items, we keep track of equality between children and their parents in the heap. More precisely, the lines in the diagram above define the following relationship: parent <= child && (parent == child) == child.equalParent We can track, make use of and update the equalParent field without any additional comparisons. For more formal definitions and proof of correctness, see CASSANDRA-8915.
/** * A MergeIterator that consumes multiple input values per output value. * * The most straightforward way to implement this is to use a {@code PriorityQueue} of iterators, {@code poll} it to * find the next item to consume, then {@code add} the iterator back after advancing. This is not very efficient as * {@code poll} and {@code add} in all cases require at least {@code log(size)} comparisons (usually more than * {@code 2*log(size)}) per consumed item, even if the input is suitable for fast iteration. * * The implementation below makes use of the fact that replacing the top element in a binary heap can be done much * more efficiently than separately removing it and placing it back, especially in the cases where the top iterator * is to be used again very soon (e.g. when there are large sections of the output where only a limited number of * input iterators overlap, which is normally the case in many practically useful situations, e.g. levelled * compaction). To further improve this particular scenario, we also use a short sorted section at the start of the * queue. * * The heap is laid out as this (for {@code SORTED_SECTION_SIZE == 2}): * 0 * | * 1 * | * 2 * / \ * 3 4 * / \ / \ * 5 6 7 8 * .. .. .. .. * Where each line is a <= relationship. * * In the sorted section we can advance with a single comparison per level, while advancing a level within the heap * requires two (so that we can find the lighter element to pop up). * The sorted section adds a constant overhead when data is uniformly distributed among the iterators, but may up * to halve the iteration time when one iterator is dominant over sections of the merged data (as is the case with * non-overlapping iterators). * * The iterator is further complicated by the need to avoid advancing the input iterators until an output is * actually requested. To achieve this {@code consume} walks the heap to find equal items without advancing the * iterators, and {@code advance} moves them and restores the heap structure before any items can be consumed. * * To avoid having to do additional comparisons in consume to identify the equal items, we keep track of equality * between children and their parents in the heap. More precisely, the lines in the diagram above define the * following relationship: * parent <= child && (parent == child) == child.equalParent * We can track, make use of and update the equalParent field without any additional comparisons. * * For more formal definitions and proof of correctness, see CASSANDRA-8915. */
static final class ManyToOne<In,Out> extends MergeIterator<In,Out> { protected final Candidate<In>[] heap;
Number of non-exhausted iterators.
/** Number of non-exhausted iterators. */
int size;
Position of the deepest, right-most child that needs advancing before we can start consuming. Because advancing changes the values of the items of each iterator, the parent-chain from any position in this range that needs advancing is not in correct order. The trees rooted at any position that does not need advancing, however, retain their prior-held binary heap property.
/** * Position of the deepest, right-most child that needs advancing before we can start consuming. * Because advancing changes the values of the items of each iterator, the parent-chain from any position * in this range that needs advancing is not in correct order. The trees rooted at any position that does * not need advancing, however, retain their prior-held binary heap property. */
int needingAdvance;
The number of elements to keep in order before the binary heap starts, exclusive of the top heap element.
/** * The number of elements to keep in order before the binary heap starts, exclusive of the top heap element. */
static final int SORTED_SECTION_SIZE = 4; public ManyToOne(List<? extends Iterator<In>> iters, Comparator<? super In> comp, Reducer<In, Out> reducer) { super(iters, reducer); @SuppressWarnings("unchecked") Candidate<In>[] heap = new Candidate[iters.size()]; this.heap = heap; size = 0; for (int i = 0; i < iters.size(); i++) { Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp); heap[size++] = candidate; } needingAdvance = size; } protected final Out computeNext() { advance(); return consume(); }
Advance all iterators that need to be advanced and place them into suitable positions in the heap. By walking the iterators backwards we know that everything after the point being processed already forms correctly ordered subheaps, thus we can build a subheap rooted at the current position by only sinking down the newly advanced iterator. Because all parents of a consumed iterator are also consumed there is no way that we can process one consumed iterator but skip over its parent. The procedure is the same as the one used for the initial building of a heap in the heapsort algorithm and has a maximum number of comparisons (2 * log(size) + SORTED_SECTION_SIZE / 2) multiplied by the number of iterators whose items were consumed at the previous step, but is also at most linear in the size of the heap if the number of consumed elements is high (as it is in the initial heap construction). With non- or lightly-overlapping iterators the procedure finishes after just one (resp. a couple of) comparisons.
/** * Advance all iterators that need to be advanced and place them into suitable positions in the heap. * * By walking the iterators backwards we know that everything after the point being processed already forms * correctly ordered subheaps, thus we can build a subheap rooted at the current position by only sinking down * the newly advanced iterator. Because all parents of a consumed iterator are also consumed there is no way * that we can process one consumed iterator but skip over its parent. * * The procedure is the same as the one used for the initial building of a heap in the heapsort algorithm and * has a maximum number of comparisons {@code (2 * log(size) + SORTED_SECTION_SIZE / 2)} multiplied by the * number of iterators whose items were consumed at the previous step, but is also at most linear in the size of * the heap if the number of consumed elements is high (as it is in the initial heap construction). With non- or * lightly-overlapping iterators the procedure finishes after just one (resp. a couple of) comparisons. */
private void advance() { // Turn the set of candidates into a heap. for (int i = needingAdvance - 1; i >= 0; --i) { Candidate<In> candidate = heap[i]; /** * needingAdvance runs to the maximum index (and deepest-right node) that may need advancing; * since the equal items that were consumed at-once may occur in sub-heap "veins" of equality, * not all items above this deepest-right position may have been consumed; these already form * valid sub-heaps and can be skipped-over entirely */ if (candidate.needsAdvance()) replaceAndSink(candidate.advance(), i); } }
Consume all items that sort like the current top of the heap. As we cannot advance the iterators to let equivalent items pop up, we walk the heap to find them and mark them as needing advance. This relies on the equalParent flag to avoid doing any comparisons.
/** * Consume all items that sort like the current top of the heap. As we cannot advance the iterators to let * equivalent items pop up, we walk the heap to find them and mark them as needing advance. * * This relies on the equalParent flag to avoid doing any comparisons. */
private Out consume() { if (size == 0) return endOfData(); reducer.onKeyChange(); assert !heap[0].equalParent; heap[0].consume(reducer); final int size = this.size; final int sortedSectionSize = Math.min(size, SORTED_SECTION_SIZE); int i; consume: { for (i = 1; i < sortedSectionSize; ++i) { if (!heap[i].equalParent) break consume; heap[i].consume(reducer); } i = Math.max(i, consumeHeap(i) + 1); } needingAdvance = i; return reducer.getReduced(); }
Recursively consume all items equal to equalItem in the binary subheap rooted at position idx.
Returns:the largest equal index found in this search.
/** * Recursively consume all items equal to equalItem in the binary subheap rooted at position idx. * * @return the largest equal index found in this search. */
private int consumeHeap(int idx) { if (idx >= size || !heap[idx].equalParent) return -1; heap[idx].consume(reducer); int nextIdx = (idx << 1) - (SORTED_SECTION_SIZE - 1); return Math.max(idx, Math.max(consumeHeap(nextIdx), consumeHeap(nextIdx + 1))); }
Replace an iterator in the heap with the given position and move it down the heap until it finds its proper position, pulling lighter elements up the heap. Whenever an equality is found between two elements that form a new parent-child relationship, the child's equalParent flag is set to true if the elements are equal.
/** * Replace an iterator in the heap with the given position and move it down the heap until it finds its proper * position, pulling lighter elements up the heap. * * Whenever an equality is found between two elements that form a new parent-child relationship, the child's * equalParent flag is set to true if the elements are equal. */
private void replaceAndSink(Candidate<In> candidate, int currIdx) { if (candidate == null) { // Drop iterator by replacing it with the last one in the heap. candidate = heap[--size]; heap[size] = null; // not necessary but helpful for debugging } // The new element will be top of its heap, at this point there is no parent to be equal to. candidate.equalParent = false; final int size = this.size; final int sortedSectionSize = Math.min(size - 1, SORTED_SECTION_SIZE); int nextIdx; // Advance within the sorted section, pulling up items lighter than candidate. while ((nextIdx = currIdx + 1) <= sortedSectionSize) { if (!heap[nextIdx].equalParent) // if we were greater then an (or were the) equal parent, we are >= the child { int cmp = candidate.compareTo(heap[nextIdx]); if (cmp <= 0) { heap[nextIdx].equalParent = cmp == 0; heap[currIdx] = candidate; return; } } heap[currIdx] = heap[nextIdx]; currIdx = nextIdx; } // If size <= SORTED_SECTION_SIZE, nextIdx below will be no less than size, // because currIdx == sortedSectionSize == size - 1 and nextIdx becomes // (size - 1) * 2) - (size - 1 - 1) == size. // Advance in the binary heap, pulling up the lighter element from the two at each level. while ((nextIdx = (currIdx * 2) - (sortedSectionSize - 1)) + 1 < size) { if (!heap[nextIdx].equalParent) { if (!heap[nextIdx + 1].equalParent) { // pick the smallest of the two children int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx]); if (siblingCmp < 0) ++nextIdx; // if we're smaller than this, we are done, and must only restore the heap and equalParent properties int cmp = candidate.compareTo(heap[nextIdx]); if (cmp <= 0) { if (cmp == 0) { heap[nextIdx].equalParent = true; if (siblingCmp == 0) // siblingCmp == 0 => nextIdx is the left child heap[nextIdx + 1].equalParent = true; } heap[currIdx] = candidate; return; } if (siblingCmp == 0) { // siblingCmp == 0 => nextIdx is still the left child // if the two siblings were equal, and we are inserting something greater, we will // pull up the left one; this means the right gets an equalParent heap[nextIdx + 1].equalParent = true; } } else ++nextIdx; // descend down the path where we found the equal child } heap[currIdx] = heap[nextIdx]; currIdx = nextIdx; } // our loop guard ensures there are always two siblings to process; typically when we exit the loop we will // be well past the end of the heap and this next condition will match... if (nextIdx >= size) { heap[currIdx] = candidate; return; } // ... but sometimes we will have one last child to compare against, that has no siblings if (!heap[nextIdx].equalParent) { int cmp = candidate.compareTo(heap[nextIdx]); if (cmp <= 0) { heap[nextIdx].equalParent = cmp == 0; heap[currIdx] = candidate; return; } } heap[currIdx] = heap[nextIdx]; heap[nextIdx] = candidate; } } // Holds and is comparable by the head item of an iterator it owns protected static final class Candidate<In> implements Comparable<Candidate<In>> { private final Iterator<? extends In> iter; private final Comparator<? super In> comp; private final int idx; private In item; private In lowerBound; boolean equalParent; public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp) { this.iter = iter; this.comp = comp; this.idx = idx; this.lowerBound = iter instanceof IteratorWithLowerBound ? ((IteratorWithLowerBound<In>)iter).lowerBound() : null; }
Returns:this if our iterator had an item, and it is now available, otherwise null
/** @return this if our iterator had an item, and it is now available, otherwise null */
protected Candidate<In> advance() { if (lowerBound != null) { item = lowerBound; return this; } if (!iter.hasNext()) return null; item = iter.next(); return this; } public int compareTo(Candidate<In> that) { assert this.item != null && that.item != null; int ret = comp.compare(this.item, that.item); if (ret == 0 && (this.isLowerBound() ^ that.isLowerBound())) { // if the items are equal and one of them is a lower bound (but not the other one) // then ensure the lower bound is less than the real item so we can safely // skip lower bounds when consuming return this.isLowerBound() ? -1 : 1; } return ret; } private boolean isLowerBound() { return item == lowerBound; } public void consume(Reducer reducer) { if (isLowerBound()) { item = null; lowerBound = null; } else { reducer.reduce(idx, item); item = null; } } public boolean needsAdvance() { return item == null; } }
Accumulator that collects values of type A, and outputs a value of type B.
/** Accumulator that collects values of type A, and outputs a value of type B. */
public static abstract class Reducer<In,Out> {
Returns:true if Out is the same as In for the case of a single source iterator
/** * @return true if Out is the same as In for the case of a single source iterator */
public boolean trivialReduceIsTrivial() { return false; }
combine this object with the previous ones. intermediate state is up to your implementation.
/** * combine this object with the previous ones. * intermediate state is up to your implementation. */
public abstract void reduce(int idx, In current);
Returns:The last object computed by reduce
/** @return The last object computed by reduce */
protected abstract Out getReduced();
Called at the beginning of each new key, before any reduce is called. To be overridden by implementing classes.
/** * Called at the beginning of each new key, before any reduce is called. * To be overridden by implementing classes. */
protected void onKeyChange() {}
May be overridden by implementations that require cleaning up after use
/** * May be overridden by implementations that require cleaning up after use */
public void close() {} } private static class OneToOne<In, Out> extends MergeIterator<In, Out> { private final Iterator<In> source; public OneToOne(List<? extends Iterator<In>> sources, Reducer<In, Out> reducer) { super(sources, reducer); source = sources.get(0); } protected Out computeNext() { if (!source.hasNext()) return endOfData(); reducer.onKeyChange(); reducer.reduce(0, source.next()); return reducer.getReduced(); } } private static class TrivialOneToOne<In, Out> extends MergeIterator<In, Out> { private final Iterator<In> source; public TrivialOneToOne(List<? extends Iterator<In>> sources, Reducer<In, Out> reducer) { super(sources, reducer); source = sources.get(0); } @SuppressWarnings("unchecked") protected Out computeNext() { if (!source.hasNext()) return endOfData(); return (Out) source.next(); } } }