/*
 * Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package java.util.stream;

import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.DoublePredicate;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.LongConsumer;
import java.util.function.LongPredicate;
import java.util.function.Predicate;

Factory for instances of a takeWhile and dropWhile operations that produce subsequences of their input stream.
Since:9
/** * Factory for instances of a takeWhile and dropWhile operations * that produce subsequences of their input stream. * * @since 9 */
final class WhileOps { static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT; static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED;
Appends a "takeWhile" operation to the provided Stream.
Params:
  • upstream – a reference stream with element type T
  • predicate – the predicate that returns false to halt taking.
Type parameters:
  • <T> – the type of both input and output elements
/** * Appends a "takeWhile" operation to the provided Stream. * * @param <T> the type of both input and output elements * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt taking. */
static <T> Stream<T> makeTakeWhileRef(AbstractPipeline<?, T, ?> upstream, Predicate<? super T> predicate) { Objects.requireNonNull(predicate); return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, TAKE_FLAGS) { @Override <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) .spliterator(); } else { return new UnorderedWhileSpliterator.OfRef.Taking<>( helper.wrapSpliterator(spliterator), false, predicate); } } @Override <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) { return new TakeWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink<T> opWrapSink(int flags, Sink<T> sink) { return new Sink.ChainedReference<T, T>(sink) { boolean take = true; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(T t) { if (take && (take = predicate.test(t))) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } }; } }; }
Appends a "takeWhile" operation to the provided IntStream.
Params:
  • upstream – a reference stream with element type T
  • predicate – the predicate that returns false to halt taking.
/** * Appends a "takeWhile" operation to the provided IntStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt taking. */
static IntStream makeTakeWhileInt(AbstractPipeline<?, Integer, ?> upstream, IntPredicate predicate) { Objects.requireNonNull(predicate); return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) { @Override <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Integer[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfInt.Taking( (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); } } @Override <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, IntFunction<Integer[]> generator) { return new TakeWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { return new Sink.ChainedInt<Integer>(sink) { boolean take = true; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(int t) { if (take && (take = predicate.test(t))) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } }; } }; }
Appends a "takeWhile" operation to the provided LongStream.
Params:
  • upstream – a reference stream with element type T
  • predicate – the predicate that returns false to halt taking.
/** * Appends a "takeWhile" operation to the provided LongStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt taking. */
static LongStream makeTakeWhileLong(AbstractPipeline<?, Long, ?> upstream, LongPredicate predicate) { Objects.requireNonNull(predicate); return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) { @Override <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Long[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfLong.Taking( (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); } } @Override <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, IntFunction<Long[]> generator) { return new TakeWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink<Long> opWrapSink(int flags, Sink<Long> sink) { return new Sink.ChainedLong<Long>(sink) { boolean take = true; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(long t) { if (take && (take = predicate.test(t))) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } }; } }; }
Appends a "takeWhile" operation to the provided DoubleStream.
Params:
  • upstream – a reference stream with element type T
  • predicate – the predicate that returns false to halt taking.
/** * Appends a "takeWhile" operation to the provided DoubleStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt taking. */
static DoubleStream makeTakeWhileDouble(AbstractPipeline<?, Double, ?> upstream, DoublePredicate predicate) { Objects.requireNonNull(predicate); return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) { @Override <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Double[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfDouble.Taking( (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); } } @Override <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, IntFunction<Double[]> generator) { return new TakeWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink<Double> opWrapSink(int flags, Sink<Double> sink) { return new Sink.ChainedDouble<Double>(sink) { boolean take = true; @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(double t) { if (take && (take = predicate.test(t))) { downstream.accept(t); } } @Override public boolean cancellationRequested() { return !take || downstream.cancellationRequested(); } }; } }; }
A specialization for the dropWhile operation that controls if elements to be dropped are counted and passed downstream.

This specialization is utilized by the TakeWhileTask for pipelines that are ordered. In such cases elements cannot be dropped until all elements have been collected.

Type parameters:
  • <T> – the type of both input and output elements
/** * A specialization for the dropWhile operation that controls if * elements to be dropped are counted and passed downstream. * <p> * This specialization is utilized by the {@link TakeWhileTask} for * pipelines that are ordered. In such cases elements cannot be dropped * until all elements have been collected. * * @param <T> the type of both input and output elements */
interface DropWhileOp<T> {
Accepts a Sink which will receive the results of this dropWhile operation, and return a DropWhileSink which accepts elements and which performs the dropWhile operation passing the results to the provided Sink.
Params:
  • sink – sink to which elements should be sent after processing
  • retainAndCountDroppedElements – true if elements to be dropped are counted and passed to the sink, otherwise such elements are actually dropped and not passed to the sink.
Returns:a dropWhile sink
/** * Accepts a {@code Sink} which will receive the results of this * dropWhile operation, and return a {@code DropWhileSink} which * accepts * elements and which performs the dropWhile operation passing the * results to the provided {@code Sink}. * * @param sink sink to which elements should be sent after processing * @param retainAndCountDroppedElements true if elements to be dropped * are counted and passed to the sink, otherwise such elements * are actually dropped and not passed to the sink. * @return a dropWhile sink */
DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements); }
A specialization for a dropWhile sink.
Type parameters:
  • <T> – the type of both input and output elements
/** * A specialization for a dropWhile sink. * * @param <T> the type of both input and output elements */
interface DropWhileSink<T> extends Sink<T> {
Returns:the could of elements that would have been dropped and instead were passed downstream.
/** * @return the could of elements that would have been dropped and * instead were passed downstream. */
long getDropCount(); }
Appends a "dropWhile" operation to the provided Stream.
Params:
  • upstream – a reference stream with element type T
  • predicate – the predicate that returns false to halt dropping.
Type parameters:
  • <T> – the type of both input and output elements
/** * Appends a "dropWhile" operation to the provided Stream. * * @param <T> the type of both input and output elements * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt dropping. */
static <T> Stream<T> makeDropWhileRef(AbstractPipeline<?, T, ?> upstream, Predicate<? super T> predicate) { Objects.requireNonNull(predicate); class Op extends ReferencePipeline.StatefulOp<T, T> implements DropWhileOp<T> { public Op(AbstractPipeline<?, T, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, inputShape, opFlags); } @Override <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Nodes.castingArray()) .spliterator(); } else { return new UnorderedWhileSpliterator.OfRef.Dropping<>( helper.wrapSpliterator(spliterator), false, predicate); } } @Override <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) { return new DropWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink<T> opWrapSink(int flags, Sink<T> sink) { return opWrapSink(sink, false); } public DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements) { class OpSink extends Sink.ChainedReference<T, T> implements DropWhileSink<T> { long dropCount; boolean take; OpSink() { super(sink); } @Override public void accept(T t) { boolean takeElement = take || (take = !predicate.test(t)); // If ordered and element is dropped increment index // for possible future truncation if (retainAndCountDroppedElements && !takeElement) dropCount++; // If ordered need to process element, otherwise // skip if element is dropped if (retainAndCountDroppedElements || takeElement) downstream.accept(t); } @Override public long getDropCount() { return dropCount; } } return new OpSink(); } } return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS); }
Appends a "dropWhile" operation to the provided IntStream.
Params:
  • upstream – a reference stream with element type T
  • predicate – the predicate that returns false to halt dropping.
/** * Appends a "dropWhile" operation to the provided IntStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt dropping. */
static IntStream makeDropWhileInt(AbstractPipeline<?, Integer, ?> upstream, IntPredicate predicate) { Objects.requireNonNull(predicate); class Op extends IntPipeline.StatefulOp<Integer> implements DropWhileOp<Integer> { public Op(AbstractPipeline<?, Integer, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, inputShape, opFlags); } @Override <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Integer[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfInt.Dropping( (Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate); } } @Override <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper, Spliterator<P_IN> spliterator, IntFunction<Integer[]> generator) { return new DropWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) { return opWrapSink(sink, false); } public DropWhileSink<Integer> opWrapSink(Sink<Integer> sink, boolean retainAndCountDroppedElements) { class OpSink extends Sink.ChainedInt<Integer> implements DropWhileSink<Integer> { long dropCount; boolean take; OpSink() { super(sink); } @Override public void accept(int t) { boolean takeElement = take || (take = !predicate.test(t)); // If ordered and element is dropped increment index // for possible future truncation if (retainAndCountDroppedElements && !takeElement) dropCount++; // If ordered need to process element, otherwise // skip if element is dropped if (retainAndCountDroppedElements || takeElement) downstream.accept(t); } @Override public long getDropCount() { return dropCount; } } return new OpSink(); } } return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS); }
Appends a "dropWhile" operation to the provided LongStream.
Params:
  • upstream – a reference stream with element type T
  • predicate – the predicate that returns false to halt dropping.
/** * Appends a "dropWhile" operation to the provided LongStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt dropping. */
static LongStream makeDropWhileLong(AbstractPipeline<?, Long, ?> upstream, LongPredicate predicate) { Objects.requireNonNull(predicate); class Op extends LongPipeline.StatefulOp<Long> implements DropWhileOp<Long> { public Op(AbstractPipeline<?, Long, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, inputShape, opFlags); } @Override <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Long[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfLong.Dropping( (Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate); } } @Override <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, IntFunction<Long[]> generator) { return new DropWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink<Long> opWrapSink(int flags, Sink<Long> sink) { return opWrapSink(sink, false); } public DropWhileSink<Long> opWrapSink(Sink<Long> sink, boolean retainAndCountDroppedElements) { class OpSink extends Sink.ChainedLong<Long> implements DropWhileSink<Long> { long dropCount; boolean take; OpSink() { super(sink); } @Override public void accept(long t) { boolean takeElement = take || (take = !predicate.test(t)); // If ordered and element is dropped increment index // for possible future truncation if (retainAndCountDroppedElements && !takeElement) dropCount++; // If ordered need to process element, otherwise // skip if element is dropped if (retainAndCountDroppedElements || takeElement) downstream.accept(t); } @Override public long getDropCount() { return dropCount; } } return new OpSink(); } } return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS); }
Appends a "dropWhile" operation to the provided DoubleStream.
Params:
  • upstream – a reference stream with element type T
  • predicate – the predicate that returns false to halt dropping.
/** * Appends a "dropWhile" operation to the provided DoubleStream. * * @param upstream a reference stream with element type T * @param predicate the predicate that returns false to halt dropping. */
static DoubleStream makeDropWhileDouble(AbstractPipeline<?, Double, ?> upstream, DoublePredicate predicate) { Objects.requireNonNull(predicate); class Op extends DoublePipeline.StatefulOp<Double> implements DropWhileOp<Double> { public Op(AbstractPipeline<?, Double, ?> upstream, StreamShape inputShape, int opFlags) { super(upstream, inputShape, opFlags); } @Override <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator) { if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) { return opEvaluateParallel(helper, spliterator, Double[]::new) .spliterator(); } else { return new UnorderedWhileSpliterator.OfDouble.Dropping( (Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate); } } @Override <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper, Spliterator<P_IN> spliterator, IntFunction<Double[]> generator) { return new DropWhileTask<>(this, helper, spliterator, generator) .invoke(); } @Override Sink<Double> opWrapSink(int flags, Sink<Double> sink) { return opWrapSink(sink, false); } public DropWhileSink<Double> opWrapSink(Sink<Double> sink, boolean retainAndCountDroppedElements) { class OpSink extends Sink.ChainedDouble<Double> implements DropWhileSink<Double> { long dropCount; boolean take; OpSink() { super(sink); } @Override public void accept(double t) { boolean takeElement = take || (take = !predicate.test(t)); // If ordered and element is dropped increment index // for possible future truncation if (retainAndCountDroppedElements && !takeElement) dropCount++; // If ordered need to process element, otherwise // skip if element is dropped if (retainAndCountDroppedElements || takeElement) downstream.accept(t); } @Override public long getDropCount() { return dropCount; } } return new OpSink(); } } return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS); } //
A spliterator supporting takeWhile and dropWhile operations over an underlying spliterator whose covered elements have no encounter order.

Concrete subclasses of this spliterator support reference and primitive types for takeWhile and dropWhile.

For the takeWhile operation if during traversal taking completes then taking is cancelled globally for the splitting and traversal of all related spliterators. Cancellation is governed by a shared AtomicBoolean instance. A spliterator in the process of taking when cancellation occurs will also be cancelled but not necessarily immediately. To reduce contention on the AtomicBoolean instance, cancellation make be acted on after a small number of additional elements have been traversed.

For the dropWhile operation if during traversal dropping completes for some, but not all elements, then it is cancelled globally for the traversal of all related spliterators (splitting is not cancelled). Cancellation is governed in the same manner as for the takeWhile operation.

Type parameters:
  • <T> – the type of elements returned by this spliterator
  • <T_SPLITR> – the type of the spliterator
/** * A spliterator supporting takeWhile and dropWhile operations over an * underlying spliterator whose covered elements have no encounter order. * <p> * Concrete subclasses of this spliterator support reference and primitive * types for takeWhile and dropWhile. * <p> * For the takeWhile operation if during traversal taking completes then * taking is cancelled globally for the splitting and traversal of all * related spliterators. * Cancellation is governed by a shared {@link AtomicBoolean} instance. A * spliterator in the process of taking when cancellation occurs will also * be cancelled but not necessarily immediately. To reduce contention on * the {@link AtomicBoolean} instance, cancellation make be acted on after * a small number of additional elements have been traversed. * <p> * For the dropWhile operation if during traversal dropping completes for * some, but not all elements, then it is cancelled globally for the * traversal of all related spliterators (splitting is not cancelled). * Cancellation is governed in the same manner as for the takeWhile * operation. * * @param <T> the type of elements returned by this spliterator * @param <T_SPLITR> the type of the spliterator */
abstract static class UnorderedWhileSpliterator<T, T_SPLITR extends Spliterator<T>> implements Spliterator<T> { // Power of two constant minus one used for modulus of count static final int CANCEL_CHECK_COUNT = (1 << 6) - 1; // The underlying spliterator final T_SPLITR s; // True if no splitting should be performed, if true then // this spliterator may be used for an underlying spliterator whose // covered elements have an encounter order // See use in stream take/dropWhile default default methods final boolean noSplitting; // True when operations are cancelled for all related spliterators // For taking, spliterators cannot split or traversed // For dropping, spliterators cannot be traversed final AtomicBoolean cancel; // True while taking or dropping should be performed when traversing boolean takeOrDrop = true; // The count of elements traversed int count; UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) { this.s = s; this.noSplitting = noSplitting; this.cancel = new AtomicBoolean(); } UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator<T, T_SPLITR> parent) { this.s = s; this.noSplitting = parent.noSplitting; this.cancel = parent.cancel; } @Override public long estimateSize() { return s.estimateSize(); } @Override public int characteristics() { // Size is not known return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED); } @Override public long getExactSizeIfKnown() { return -1L; } @Override public Comparator<? super T> getComparator() { return s.getComparator(); } @Override public T_SPLITR trySplit() { @SuppressWarnings("unchecked") T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit(); return ls != null ? makeSpliterator(ls) : null; } boolean checkCancelOnCount() { return count != 0 || !cancel.get(); } abstract T_SPLITR makeSpliterator(T_SPLITR s); abstract static class OfRef<T> extends UnorderedWhileSpliterator<T, Spliterator<T>> implements Consumer<T> { final Predicate<? super T> p; T t; OfRef(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { super(s, noSplitting); this.p = p; } OfRef(Spliterator<T> s, OfRef<T> parent) { super(s, parent); this.p = parent.p; } @Override public void accept(T t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } static final class Taking<T> extends OfRef<T> { Taking(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { super(s, noSplitting, p); } Taking(Spliterator<T> s, Taking<T> parent) { super(s, parent); } @Override public boolean tryAdvance(Consumer<? super T> action) { boolean test = true; if (takeOrDrop && // If can take checkCancelOnCount() && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Spliterator<T> trySplit() { // Do not split if all operations are cancelled return cancel.get() ? null : super.trySplit(); } @Override Spliterator<T> makeSpliterator(Spliterator<T> s) { return new Taking<>(s, this); } } static final class Dropping<T> extends OfRef<T> { Dropping(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) { super(s, noSplitting, p); } Dropping(Spliterator<T> s, Dropping<T> parent) { super(s, parent); } @Override public boolean tryAdvance(Consumer<? super T> action) { if (takeOrDrop) { takeOrDrop = false; boolean adv; boolean dropped = false; while ((adv = s.tryAdvance(this)) && // If advanced one element checkCancelOnCount() && // and if not cancelled p.test(t)) { // and test on element passes dropped = true; // then drop element } // Report advanced element, if any if (adv) { // Cancel all further dropping if one or more elements // were previously dropped if (dropped) cancel.set(true); action.accept(t); } return adv; } else { return s.tryAdvance(action); } } @Override Spliterator<T> makeSpliterator(Spliterator<T> s) { return new Dropping<>(s, this); } } } abstract static class OfInt extends UnorderedWhileSpliterator<Integer, Spliterator.OfInt> implements IntConsumer, Spliterator.OfInt { final IntPredicate p; int t; OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { super(s, noSplitting); this.p = p; } OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { super(s, parent); this.p = parent.p; } @Override public void accept(int t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } static final class Taking extends UnorderedWhileSpliterator.OfInt { Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { super(s, noSplitting, p); } Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { super(s, parent); } @Override public boolean tryAdvance(IntConsumer action) { boolean test = true; if (takeOrDrop && // If can take checkCancelOnCount() && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Spliterator.OfInt trySplit() { // Do not split if all operations are cancelled return cancel.get() ? null : super.trySplit(); } @Override Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { return new Taking(s, this); } } static final class Dropping extends UnorderedWhileSpliterator.OfInt { Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) { super(s, noSplitting, p); } Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) { super(s, parent); } @Override public boolean tryAdvance(IntConsumer action) { if (takeOrDrop) { takeOrDrop = false; boolean adv; boolean dropped = false; while ((adv = s.tryAdvance(this)) && // If advanced one element checkCancelOnCount() && // and if not cancelled p.test(t)) { // and test on element passes dropped = true; // then drop element } // Report advanced element, if any if (adv) { // Cancel all further dropping if one or more elements // were previously dropped if (dropped) cancel.set(true); action.accept(t); } return adv; } else { return s.tryAdvance(action); } } @Override Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { return new Dropping(s, this); } } } abstract static class OfLong extends UnorderedWhileSpliterator<Long, Spliterator.OfLong> implements LongConsumer, Spliterator.OfLong { final LongPredicate p; long t; OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { super(s, noSplitting); this.p = p; } OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { super(s, parent); this.p = parent.p; } @Override public void accept(long t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } static final class Taking extends UnorderedWhileSpliterator.OfLong { Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { super(s, noSplitting, p); } Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { super(s, parent); } @Override public boolean tryAdvance(LongConsumer action) { boolean test = true; if (takeOrDrop && // If can take checkCancelOnCount() && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Spliterator.OfLong trySplit() { // Do not split if all operations are cancelled return cancel.get() ? null : super.trySplit(); } @Override Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { return new Taking(s, this); } } static final class Dropping extends UnorderedWhileSpliterator.OfLong { Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) { super(s, noSplitting, p); } Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) { super(s, parent); } @Override public boolean tryAdvance(LongConsumer action) { if (takeOrDrop) { takeOrDrop = false; boolean adv; boolean dropped = false; while ((adv = s.tryAdvance(this)) && // If advanced one element checkCancelOnCount() && // and if not cancelled p.test(t)) { // and test on element passes dropped = true; // then drop element } // Report advanced element, if any if (adv) { // Cancel all further dropping if one or more elements // were previously dropped if (dropped) cancel.set(true); action.accept(t); } return adv; } else { return s.tryAdvance(action); } } @Override Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { return new Dropping(s, this); } } } abstract static class OfDouble extends UnorderedWhileSpliterator<Double, Spliterator.OfDouble> implements DoubleConsumer, Spliterator.OfDouble { final DoublePredicate p; double t; OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { super(s, noSplitting); this.p = p; } OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { super(s, parent); this.p = parent.p; } @Override public void accept(double t) { count = (count + 1) & CANCEL_CHECK_COUNT; this.t = t; } static final class Taking extends UnorderedWhileSpliterator.OfDouble { Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { super(s, noSplitting, p); } Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { super(s, parent); } @Override public boolean tryAdvance(DoubleConsumer action) { boolean test = true; if (takeOrDrop && // If can take checkCancelOnCount() && // and if not cancelled s.tryAdvance(this) && // and if advanced one element (test = p.test(t))) { // and test on element passes action.accept(t); // then accept element return true; } else { // Taking is finished takeOrDrop = false; // Cancel all further traversal and splitting operations // only if test of element failed (short-circuited) if (!test) cancel.set(true); return false; } } @Override public Spliterator.OfDouble trySplit() { // Do not split if all operations are cancelled return cancel.get() ? null : super.trySplit(); } @Override Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { return new Taking(s, this); } } static final class Dropping extends UnorderedWhileSpliterator.OfDouble { Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) { super(s, noSplitting, p); } Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) { super(s, parent); } @Override public boolean tryAdvance(DoubleConsumer action) { if (takeOrDrop) { takeOrDrop = false; boolean adv; boolean dropped = false; while ((adv = s.tryAdvance(this)) && // If advanced one element checkCancelOnCount() && // and if not cancelled p.test(t)) { // and test on element passes dropped = true; // then drop element } // Report advanced element, if any if (adv) { // Cancel all further dropping if one or more elements // were previously dropped if (dropped) cancel.set(true); action.accept(t); } return adv; } else { return s.tryAdvance(action); } } @Override Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { return new Dropping(s, this); } } } } //
ForkJoinTask implementing takeWhile computation.

If the pipeline has encounter order then all tasks to the right of a task where traversal was short-circuited are cancelled. The results of completed (and cancelled) tasks are discarded. The result of merging a short-circuited left task and right task (which may or may not be short-circuited) is that left task.

If the pipeline has no encounter order then all tasks to the right of a task where traversal was short-circuited are cancelled. The results of completed (and possibly cancelled) tasks are not discarded, as there is no need to throw away computed results. The result of merging does not change if a left task was short-circuited. No attempt is made, once a leaf task stopped taking, for it to cancel all other tasks, and further more, short-circuit the computation with its result.

Type parameters:
  • <P_IN> – Input element type to the stream pipeline
  • <P_OUT> – Output element type from the stream pipeline
/** * {@code ForkJoinTask} implementing takeWhile computation. * <p> * If the pipeline has encounter order then all tasks to the right of * a task where traversal was short-circuited are cancelled. * The results of completed (and cancelled) tasks are discarded. * The result of merging a short-circuited left task and right task (which * may or may not be short-circuited) is that left task. * <p> * If the pipeline has no encounter order then all tasks to the right of * a task where traversal was short-circuited are cancelled. * The results of completed (and possibly cancelled) tasks are not * discarded, as there is no need to throw away computed results. * The result of merging does not change if a left task was * short-circuited. * No attempt is made, once a leaf task stopped taking, for it to cancel * all other tasks, and further more, short-circuit the computation with its * result. * * @param <P_IN> Input element type to the stream pipeline * @param <P_OUT> Output element type from the stream pipeline */
@SuppressWarnings("serial") private static final class TakeWhileTask<P_IN, P_OUT> extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, TakeWhileTask<P_IN, P_OUT>> { private final AbstractPipeline<P_OUT, P_OUT, ?> op; private final IntFunction<P_OUT[]> generator; private final boolean isOrdered; private long thisNodeSize; // True if a short-circuited private boolean shortCircuited; // True if completed, must be set after the local result private volatile boolean completed; TakeWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op, PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<P_OUT[]> generator) { super(helper, spliterator); this.op = op; this.generator = generator; this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); } TakeWhileTask(TakeWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { super(parent, spliterator); this.op = parent.op; this.generator = parent.generator; this.isOrdered = parent.isOrdered; } @Override protected TakeWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { return new TakeWhileTask<>(this, spliterator); } @Override protected final Node<P_OUT> getEmptyResult() { return Nodes.emptyNode(op.getOutputShape()); } @Override protected final Node<P_OUT> doLeaf() { Node.Builder<P_OUT> builder = helper.makeNodeBuilder(-1, generator); Sink<P_OUT> s = op.opWrapSink(helper.getStreamAndOpFlags(), builder); if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) { // Cancel later nodes if the predicate returned false // during traversal cancelLaterNodes(); } Node<P_OUT> node = builder.build(); thisNodeSize = node.count(); return node; } @Override public final void onCompletion(CountedCompleter<?> caller) { if (!isLeaf()) { Node<P_OUT> result; shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited; if (isOrdered && canceled) { thisNodeSize = 0; result = getEmptyResult(); } else if (isOrdered && leftChild.shortCircuited) { // If taking finished on the left node then // use the left node result thisNodeSize = leftChild.thisNodeSize; result = leftChild.getLocalResult(); } else { thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; result = merge(); } setLocalResult(result); } completed = true; super.onCompletion(caller); } Node<P_OUT> merge() { if (leftChild.thisNodeSize == 0) { // If the left node size is 0 then // use the right node result return rightChild.getLocalResult(); } else if (rightChild.thisNodeSize == 0) { // If the right node size is 0 then // use the left node result return leftChild.getLocalResult(); } else { // Combine the left and right nodes return Nodes.conc(op.getOutputShape(), leftChild.getLocalResult(), rightChild.getLocalResult()); } } @Override protected void cancel() { super.cancel(); if (isOrdered && completed) // If the task is completed then clear the result, if any // to aid GC setLocalResult(getEmptyResult()); } }
ForkJoinTask implementing dropWhile computation.

If the pipeline has encounter order then each leaf task will not drop elements but will obtain a count of the elements that would have been otherwise dropped. That count is used as an index to track elements to be dropped. Merging will update the index so it corresponds to the index that is the end of the global prefix of elements to be dropped. The root is truncated according to that index.

If the pipeline has no encounter order then each leaf task will drop elements. Leaf tasks are ordinarily merged. No truncation of the root node is required. No attempt is made, once a leaf task stopped dropping, for it to cancel all other tasks, and further more, short-circuit the computation with its result.

Type parameters:
  • <P_IN> – Input element type to the stream pipeline
  • <P_OUT> – Output element type from the stream pipeline
/** * {@code ForkJoinTask} implementing dropWhile computation. * <p> * If the pipeline has encounter order then each leaf task will not * drop elements but will obtain a count of the elements that would have * been otherwise dropped. That count is used as an index to track * elements to be dropped. Merging will update the index so it corresponds * to the index that is the end of the global prefix of elements to be * dropped. The root is truncated according to that index. * <p> * If the pipeline has no encounter order then each leaf task will drop * elements. Leaf tasks are ordinarily merged. No truncation of the root * node is required. * No attempt is made, once a leaf task stopped dropping, for it to cancel * all other tasks, and further more, short-circuit the computation with * its result. * * @param <P_IN> Input element type to the stream pipeline * @param <P_OUT> Output element type from the stream pipeline */
@SuppressWarnings("serial") private static final class DropWhileTask<P_IN, P_OUT> extends AbstractTask<P_IN, P_OUT, Node<P_OUT>, DropWhileTask<P_IN, P_OUT>> { private final AbstractPipeline<P_OUT, P_OUT, ?> op; private final IntFunction<P_OUT[]> generator; private final boolean isOrdered; private long thisNodeSize; // The index from which elements of the node should be taken // i.e. the node should be truncated from [takeIndex, thisNodeSize) // Equivalent to the count of dropped elements private long index; DropWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op, PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<P_OUT[]> generator) { super(helper, spliterator); assert op instanceof DropWhileOp; this.op = op; this.generator = generator; this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags()); } DropWhileTask(DropWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) { super(parent, spliterator); this.op = parent.op; this.generator = parent.generator; this.isOrdered = parent.isOrdered; } @Override protected DropWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) { return new DropWhileTask<>(this, spliterator); } @Override protected final Node<P_OUT> doLeaf() { boolean isChild = !isRoot(); // If this not the root and pipeline is ordered and size is known // then pre-size the builder long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags) ? op.exactOutputSizeIfKnown(spliterator) : -1; Node.Builder<P_OUT> builder = helper.makeNodeBuilder(sizeIfKnown, generator); @SuppressWarnings("unchecked") DropWhileOp<P_OUT> dropOp = (DropWhileOp<P_OUT>) op; // If this leaf is the root then there is no merging on completion // and there is no need to retain dropped elements DropWhileSink<P_OUT> s = dropOp.opWrapSink(builder, isOrdered && isChild); helper.wrapAndCopyInto(s, spliterator); Node<P_OUT> node = builder.build(); thisNodeSize = node.count(); index = s.getDropCount(); return node; } @Override public final void onCompletion(CountedCompleter<?> caller) { if (!isLeaf()) { if (isOrdered) { index = leftChild.index; // If a contiguous sequence of dropped elements // include those of the right node, if any if (index == leftChild.thisNodeSize) index += rightChild.index; } thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize; Node<P_OUT> result = merge(); setLocalResult(isRoot() ? doTruncate(result) : result); } super.onCompletion(caller); } private Node<P_OUT> merge() { if (leftChild.thisNodeSize == 0) { // If the left node size is 0 then // use the right node result return rightChild.getLocalResult(); } else if (rightChild.thisNodeSize == 0) { // If the right node size is 0 then // use the left node result return leftChild.getLocalResult(); } else { // Combine the left and right nodes return Nodes.conc(op.getOutputShape(), leftChild.getLocalResult(), rightChild.getLocalResult()); } } private Node<P_OUT> doTruncate(Node<P_OUT> input) { return isOrdered ? input.truncate(index, input.count(), generator) : input; } } }