/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.index.sasi.utils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;

import com.google.common.collect.Iterators;
import org.apache.cassandra.io.util.FileUtils;

import com.google.common.annotations.VisibleForTesting;

@SuppressWarnings("resource")
public class RangeIntersectionIterator
{
    protected enum Strategy
    {
        BOUNCE, LOOKUP, ADAPTIVE
    }

    public static <K extends Comparable<K>, D extends CombinedValue<K>> Builder<K, D> builder()
    {
        return builder(Strategy.ADAPTIVE);
    }

    @VisibleForTesting
    protected static <K extends Comparable<K>, D extends CombinedValue<K>> Builder<K, D> builder(Strategy strategy)
    {
        return new Builder<>(strategy);
    }

    public static class Builder<K extends Comparable<K>, D extends CombinedValue<K>> extends RangeIterator.Builder<K, D>
    {
        private final Strategy strategy;

        public Builder(Strategy strategy)
        {
            super(IteratorType.INTERSECTION);
            this.strategy = strategy;
        }

        protected RangeIterator<K, D> buildIterator()
        {
            // if the range is disjoint or we have an intersection with an empty set,
            // we can simply return an empty iterator, because it's not going to produce any results.
            if (statistics.isDisjoint())
                return new EmptyRangeIterator<>();

            if (rangeCount() == 1)
                return ranges.poll();

            switch (strategy)
            {
                case LOOKUP:
                    return new LookupIntersectionIterator<>(statistics, ranges);

                case BOUNCE:
                    return new BounceIntersectionIterator<>(statistics, ranges);

                case ADAPTIVE:
                    return statistics.sizeRatio() <= 0.01d
                            ? new LookupIntersectionIterator<>(statistics, ranges)
                            : new BounceIntersectionIterator<>(statistics, ranges);

                default:
                    throw new IllegalStateException("Unknown strategy: " + strategy);
            }
        }
    }

    private static abstract class AbstractIntersectionIterator<K extends Comparable<K>, D extends CombinedValue<K>> extends RangeIterator<K, D>
    {
        protected final PriorityQueue<RangeIterator<K, D>> ranges;

        private AbstractIntersectionIterator(Builder.Statistics<K, D> statistics, PriorityQueue<RangeIterator<K, D>> ranges)
        {
            super(statistics);
            this.ranges = ranges;
        }

        public void close() throws IOException
        {
            for (RangeIterator<K, D> range : ranges)
                FileUtils.closeQuietly(range);
        }
    }

    
Iterator which performs intersection of multiple ranges by using bouncing (merge-join) technique to identify common elements in the given ranges. Aforementioned "bounce" works as follows: range queue is poll'ed for the range with the smallest current token (main loop), that token is used to RangeIterator.skipTo(Comparable) other ranges, if token produced by RangeIterator.skipTo(Comparable) is equal to current "candidate" token, both get merged together and the same operation is repeated for next range from the queue, if returned token is not equal than candidate, candidate's range gets put back into the queue and the main loop gets repeated until next intersection token is found or at least one iterator runs out of tokens. This technique is every efficient to jump over gaps in the ranges.
Type parameters:
  • <K> – The type used to sort ranges.
  • <D> – The container type which is going to be returned by Iterator.next().
/** * Iterator which performs intersection of multiple ranges by using bouncing (merge-join) technique to identify * common elements in the given ranges. Aforementioned "bounce" works as follows: range queue is poll'ed for the * range with the smallest current token (main loop), that token is used to {@link RangeIterator#skipTo(Comparable)} * other ranges, if token produced by {@link RangeIterator#skipTo(Comparable)} is equal to current "candidate" token, * both get merged together and the same operation is repeated for next range from the queue, if returned token * is not equal than candidate, candidate's range gets put back into the queue and the main loop gets repeated until * next intersection token is found or at least one iterator runs out of tokens. * * This technique is every efficient to jump over gaps in the ranges. * * @param <K> The type used to sort ranges. * @param <D> The container type which is going to be returned by {@link Iterator#next()}. */
@VisibleForTesting protected static class BounceIntersectionIterator<K extends Comparable<K>, D extends CombinedValue<K>> extends AbstractIntersectionIterator<K, D> { private BounceIntersectionIterator(Builder.Statistics<K, D> statistics, PriorityQueue<RangeIterator<K, D>> ranges) { super(statistics, ranges); } protected D computeNext() { while (!ranges.isEmpty()) { RangeIterator<K, D> head = ranges.poll(); // jump right to the beginning of the intersection or return next element if (head.getCurrent().compareTo(getMinimum()) < 0) head.skipTo(getMinimum()); D candidate = head.hasNext() ? head.next() : null; if (candidate == null || candidate.get().compareTo(getMaximum()) > 0) { ranges.add(head); return endOfData(); } List<RangeIterator<K, D>> processed = new ArrayList<>(); boolean intersectsAll = true, exhausted = false; while (!ranges.isEmpty()) { RangeIterator<K, D> range = ranges.poll(); // found a range which doesn't overlap with one (or possibly more) other range(s) if (!isOverlapping(head, range)) { exhausted = true; intersectsAll = false; break; } D point = range.skipTo(candidate.get()); if (point == null) // other range is exhausted { exhausted = true; intersectsAll = false; break; } processed.add(range); if (candidate.get().equals(point.get())) { candidate.merge(point); // advance skipped range to the next element if any Iterators.getNext(range, null); } else { intersectsAll = false; break; } } ranges.add(head); for (RangeIterator<K, D> range : processed) ranges.add(range); if (exhausted) return endOfData(); if (intersectsAll) return candidate; } return endOfData(); } protected void performSkipTo(K nextToken) { List<RangeIterator<K, D>> skipped = new ArrayList<>(); while (!ranges.isEmpty()) { RangeIterator<K, D> range = ranges.poll(); range.skipTo(nextToken); skipped.add(range); } for (RangeIterator<K, D> range : skipped) ranges.add(range); } }
Iterator which performs a linear scan over a primary range (the smallest of the ranges) and O(log(n)) lookup into secondary ranges using values from the primary iterator. This technique is efficient when one of the intersection ranges is smaller than others e.g. ratio 0.01d (default), in such situation scan + lookup is more efficient comparing to "bounce" merge because "bounce" distance is never going to be big.
Type parameters:
  • <K> – The type used to sort ranges.
  • <D> – The container type which is going to be returned by Iterator.next().
/** * Iterator which performs a linear scan over a primary range (the smallest of the ranges) * and O(log(n)) lookup into secondary ranges using values from the primary iterator. * This technique is efficient when one of the intersection ranges is smaller than others * e.g. ratio 0.01d (default), in such situation scan + lookup is more efficient comparing * to "bounce" merge because "bounce" distance is never going to be big. * * @param <K> The type used to sort ranges. * @param <D> The container type which is going to be returned by {@link Iterator#next()}. */
@VisibleForTesting protected static class LookupIntersectionIterator<K extends Comparable<K>, D extends CombinedValue<K>> extends AbstractIntersectionIterator<K, D> { private final RangeIterator<K, D> smallestIterator; private LookupIntersectionIterator(Builder.Statistics<K, D> statistics, PriorityQueue<RangeIterator<K, D>> ranges) { super(statistics, ranges); smallestIterator = statistics.minRange; if (smallestIterator.getCurrent().compareTo(getMinimum()) < 0) smallestIterator.skipTo(getMinimum()); } protected D computeNext() { while (smallestIterator.hasNext()) { D candidate = smallestIterator.next(); K token = candidate.get(); boolean intersectsAll = true; for (RangeIterator<K, D> range : ranges) { // avoid checking against self, much cheaper than changing queue comparator // to compare based on the size and re-populating such queue. if (range.equals(smallestIterator)) continue; // found a range which doesn't overlap with one (or possibly more) other range(s) if (!isOverlapping(smallestIterator, range)) return endOfData(); D point = range.skipTo(token); if (point == null) // one of the iterators is exhausted return endOfData(); if (!point.get().equals(token)) { intersectsAll = false; break; } candidate.merge(point); } if (intersectsAll) return candidate; } return endOfData(); } protected void performSkipTo(K nextToken) { smallestIterator.skipTo(nextToken); } } }