/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.index.sasi.utils;

import java.io.Closeable;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;

import com.google.common.annotations.VisibleForTesting;

public abstract class RangeIterator<K extends Comparable<K>, T extends CombinedValue<K>> extends AbstractIterator<T> implements Closeable
{
    private final K min, max;
    private final long count;
    private K current;

    protected RangeIterator(Builder.Statistics<K, T> statistics)
    {
        this(statistics.min, statistics.max, statistics.tokenCount);
    }

    public RangeIterator(RangeIterator<K, T> range)
    {
        this(range == null ? null : range.min, range == null ? null : range.max, range == null ? -1 : range.count);
    }

    public RangeIterator(K min, K max, long count)
    {
        if (min == null || max == null || count == 0)
            assert min == null && max == null && (count == 0 || count == -1);

        this.min = min;
        this.current = min;
        this.max = max;
        this.count = count;
    }

    public final K getMinimum()
    {
        return min;
    }

    public final K getCurrent()
    {
        return current;
    }

    public final K getMaximum()
    {
        return max;
    }

    public final long getCount()
    {
        return count;
    }

    
When called, this iterators current position should be skipped forwards until finding either: 1) an element equal to or bigger than next 2) the end of the iterator
Params:
  • nextToken – value to skip the iterator forward until matching
Returns:The next current token after the skip was performed
/** * When called, this iterators current position should * be skipped forwards until finding either: * 1) an element equal to or bigger than next * 2) the end of the iterator * * @param nextToken value to skip the iterator forward until matching * * @return The next current token after the skip was performed */
public final T skipTo(K nextToken) { if (min == null || max == null) return endOfData(); if (current.compareTo(nextToken) >= 0) return next == null ? recomputeNext() : next; if (max.compareTo(nextToken) < 0) return endOfData(); performSkipTo(nextToken); return recomputeNext(); } protected abstract void performSkipTo(K nextToken); protected T recomputeNext() { return tryToComputeNext() ? peek() : endOfData(); } protected boolean tryToComputeNext() { boolean hasNext = super.tryToComputeNext(); current = hasNext ? next.get() : getMaximum(); return hasNext; } public static abstract class Builder<K extends Comparable<K>, D extends CombinedValue<K>> { public enum IteratorType { UNION, INTERSECTION } @VisibleForTesting protected final Statistics<K, D> statistics; @VisibleForTesting protected final PriorityQueue<RangeIterator<K, D>> ranges; public Builder(IteratorType type) { statistics = new Statistics<>(type); ranges = new PriorityQueue<>(16, (Comparator<RangeIterator<K, D>>) (a, b) -> a.getCurrent().compareTo(b.getCurrent())); } public K getMinimum() { return statistics.min; } public K getMaximum() { return statistics.max; } public long getTokenCount() { return statistics.tokenCount; } public int rangeCount() { return ranges.size(); } public Builder<K, D> add(RangeIterator<K, D> range) { if (range == null) return this; if (range.getCount() > 0) ranges.add(range); statistics.update(range); return this; } public Builder<K, D> add(List<RangeIterator<K, D>> ranges) { if (ranges == null || ranges.isEmpty()) return this; ranges.forEach(this::add); return this; } public final RangeIterator<K, D> build() { if (rangeCount() == 0) return new EmptyRangeIterator<>(); else return buildIterator(); } public static class EmptyRangeIterator<K extends Comparable<K>, D extends CombinedValue<K>> extends RangeIterator<K, D> { EmptyRangeIterator() { super(null, null, 0); } public D computeNext() { return endOfData(); } protected void performSkipTo(K nextToken) { } public void close() { } } protected abstract RangeIterator<K, D> buildIterator(); public static class Statistics<K extends Comparable<K>, D extends CombinedValue<K>> { protected final IteratorType iteratorType; protected K min, max; protected long tokenCount; // iterator with the least number of items protected RangeIterator<K, D> minRange; // iterator with the most number of items protected RangeIterator<K, D> maxRange; // tracks if all of the added ranges overlap, which is useful in case of intersection, // as it gives direct answer as to such iterator is going to produce any results. private boolean isOverlapping = true; public Statistics(IteratorType iteratorType) { this.iteratorType = iteratorType; }
Update statistics information with the given range. Updates min/max of the combined range, token count and tracks range with the least/most number of tokens.
Params:
  • range – The range to update statistics with.
/** * Update statistics information with the given range. * * Updates min/max of the combined range, token count and * tracks range with the least/most number of tokens. * * @param range The range to update statistics with. */
public void update(RangeIterator<K, D> range) { switch (iteratorType) { case UNION: min = nullSafeMin(min, range.getMinimum()); max = nullSafeMax(max, range.getMaximum()); break; case INTERSECTION: // minimum of the intersection is the biggest minimum of individual iterators min = nullSafeMax(min, range.getMinimum()); // maximum of the intersection is the smallest maximum of individual iterators max = nullSafeMin(max, range.getMaximum()); break; default: throw new IllegalStateException("Unknown iterator type: " + iteratorType); } // check if new range is disjoint with already added ranges, which means that this intersection // is not going to produce any results, so we can cleanup range storage and never added anything to it. isOverlapping &= isOverlapping(min, max, range); minRange = minRange == null ? range : min(minRange, range); maxRange = maxRange == null ? range : max(maxRange, range); tokenCount += range.getCount(); } private RangeIterator<K, D> min(RangeIterator<K, D> a, RangeIterator<K, D> b) { return a.getCount() > b.getCount() ? b : a; } private RangeIterator<K, D> max(RangeIterator<K, D> a, RangeIterator<K, D> b) { return a.getCount() > b.getCount() ? a : b; } public boolean isDisjoint() { return !isOverlapping; } public double sizeRatio() { return minRange.getCount() * 1d / maxRange.getCount(); } } } @VisibleForTesting protected static <K extends Comparable<K>, D extends CombinedValue<K>> boolean isOverlapping(RangeIterator<K, D> a, RangeIterator<K, D> b) { return isOverlapping(a.getCurrent(), a.getMaximum(), b); }
Ranges are overlapping the following cases: * When they have a common subrange: min b.current max b.max +---------|--------------+------------| b.current min max b.max |--------------+---------+------------| min b.current b.max max +----------|-------------|------------+ If either range is empty, they're disjoint.
/** * Ranges are overlapping the following cases: * * * When they have a common subrange: * * min b.current max b.max * +---------|--------------+------------| * * b.current min max b.max * |--------------+---------+------------| * * min b.current b.max max * +----------|-------------|------------+ * * * If either range is empty, they're disjoint. */
@VisibleForTesting protected static <K extends Comparable<K>, D extends CombinedValue<K>> boolean isOverlapping(K min, K max, RangeIterator<K, D> b) { return (min != null && max != null) && b.getCount() != 0 && (min.compareTo(b.getMaximum()) <= 0 && b.getCurrent().compareTo(max) <= 0); } @SuppressWarnings("unchecked") private static <T extends Comparable> T nullSafeMin(T a, T b) { if (a == null) return b; if (b == null) return a; return a.compareTo(b) > 0 ? b : a; } @SuppressWarnings("unchecked") private static <T extends Comparable> T nullSafeMax(T a, T b) { if (a == null) return b; if (b == null) return a; return a.compareTo(b) > 0 ? a : b; } }