/*
* Copyright (c) 2015, 2017, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package java.util.stream;
import java.util.Comparator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.concurrent.CountedCompleter;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.DoubleConsumer;
import java.util.function.DoublePredicate;
import java.util.function.IntConsumer;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;
import java.util.function.LongConsumer;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
Factory for instances of a takeWhile and dropWhile operations
that produce subsequences of their input stream.
Since: 9
/**
* Factory for instances of a takeWhile and dropWhile operations
* that produce subsequences of their input stream.
*
* @since 9
*/
final class WhileOps {
static final int TAKE_FLAGS = StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SHORT_CIRCUIT;
static final int DROP_FLAGS = StreamOpFlag.NOT_SIZED;
Appends a "takeWhile" operation to the provided Stream.
Params: - upstream – a reference stream with element type T
- predicate – the predicate that returns false to halt taking.
Type parameters: - <T> – the type of both input and output elements
/**
* Appends a "takeWhile" operation to the provided Stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt taking.
*/
static <T> Stream<T> makeTakeWhileRef(AbstractPipeline<?, T, ?> upstream,
Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, TAKE_FLAGS) {
@Override
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Nodes.castingArray())
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfRef.Taking<>(
helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
return new TakeWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T, T>(sink) {
boolean take = true;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(T t) {
if (take && (take = predicate.test(t))) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
};
}
};
}
Appends a "takeWhile" operation to the provided IntStream.
Params: - upstream – a reference stream with element type T
- predicate – the predicate that returns false to halt taking.
/**
* Appends a "takeWhile" operation to the provided IntStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt taking.
*/
static IntStream makeTakeWhileInt(AbstractPipeline<?, Integer, ?> upstream,
IntPredicate predicate) {
Objects.requireNonNull(predicate);
return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE, TAKE_FLAGS) {
@Override
<P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Integer[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfInt.Taking(
(Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
return new TakeWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedInt<Integer>(sink) {
boolean take = true;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(int t) {
if (take && (take = predicate.test(t))) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
};
}
};
}
Appends a "takeWhile" operation to the provided LongStream.
Params: - upstream – a reference stream with element type T
- predicate – the predicate that returns false to halt taking.
/**
* Appends a "takeWhile" operation to the provided LongStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt taking.
*/
static LongStream makeTakeWhileLong(AbstractPipeline<?, Long, ?> upstream,
LongPredicate predicate) {
Objects.requireNonNull(predicate);
return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE, TAKE_FLAGS) {
@Override
<P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Long[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfLong.Taking(
(Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator,
IntFunction<Long[]> generator) {
return new TakeWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedLong<Long>(sink) {
boolean take = true;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(long t) {
if (take && (take = predicate.test(t))) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
};
}
};
}
Appends a "takeWhile" operation to the provided DoubleStream.
Params: - upstream – a reference stream with element type T
- predicate – the predicate that returns false to halt taking.
/**
* Appends a "takeWhile" operation to the provided DoubleStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt taking.
*/
static DoubleStream makeTakeWhileDouble(AbstractPipeline<?, Double, ?> upstream,
DoublePredicate predicate) {
Objects.requireNonNull(predicate);
return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE, TAKE_FLAGS) {
@Override
<P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Double[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfDouble.Taking(
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator,
IntFunction<Double[]> generator) {
return new TakeWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return new Sink.ChainedDouble<Double>(sink) {
boolean take = true;
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(double t) {
if (take && (take = predicate.test(t))) {
downstream.accept(t);
}
}
@Override
public boolean cancellationRequested() {
return !take || downstream.cancellationRequested();
}
};
}
};
}
A specialization for the dropWhile operation that controls if
elements to be dropped are counted and passed downstream.
This specialization is utilized by the TakeWhileTask
for pipelines that are ordered. In such cases elements cannot be dropped until all elements have been collected.
Type parameters: - <T> – the type of both input and output elements
/**
* A specialization for the dropWhile operation that controls if
* elements to be dropped are counted and passed downstream.
* <p>
* This specialization is utilized by the {@link TakeWhileTask} for
* pipelines that are ordered. In such cases elements cannot be dropped
* until all elements have been collected.
*
* @param <T> the type of both input and output elements
*/
interface DropWhileOp<T> {
Accepts a Sink
which will receive the results of this dropWhile operation, and return a DropWhileSink
which accepts elements and which performs the dropWhile operation passing the results to the provided Sink
. Params: - sink – sink to which elements should be sent after processing
- retainAndCountDroppedElements – true if elements to be dropped
are counted and passed to the sink, otherwise such elements
are actually dropped and not passed to the sink.
Returns: a dropWhile sink
/**
* Accepts a {@code Sink} which will receive the results of this
* dropWhile operation, and return a {@code DropWhileSink} which
* accepts
* elements and which performs the dropWhile operation passing the
* results to the provided {@code Sink}.
*
* @param sink sink to which elements should be sent after processing
* @param retainAndCountDroppedElements true if elements to be dropped
* are counted and passed to the sink, otherwise such elements
* are actually dropped and not passed to the sink.
* @return a dropWhile sink
*/
DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements);
}
A specialization for a dropWhile sink.
Type parameters: - <T> – the type of both input and output elements
/**
* A specialization for a dropWhile sink.
*
* @param <T> the type of both input and output elements
*/
interface DropWhileSink<T> extends Sink<T> {
Returns: the could of elements that would have been dropped and
instead were passed downstream.
/**
* @return the could of elements that would have been dropped and
* instead were passed downstream.
*/
long getDropCount();
}
Appends a "dropWhile" operation to the provided Stream.
Params: - upstream – a reference stream with element type T
- predicate – the predicate that returns false to halt dropping.
Type parameters: - <T> – the type of both input and output elements
/**
* Appends a "dropWhile" operation to the provided Stream.
*
* @param <T> the type of both input and output elements
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt dropping.
*/
static <T> Stream<T> makeDropWhileRef(AbstractPipeline<?, T, ?> upstream,
Predicate<? super T> predicate) {
Objects.requireNonNull(predicate);
class Op extends ReferencePipeline.StatefulOp<T, T> implements DropWhileOp<T> {
public Op(AbstractPipeline<?, T, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, inputShape, opFlags);
}
@Override
<P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Nodes.castingArray())
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfRef.Dropping<>(
helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator,
IntFunction<T[]> generator) {
return new DropWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return opWrapSink(sink, false);
}
public DropWhileSink<T> opWrapSink(Sink<T> sink, boolean retainAndCountDroppedElements) {
class OpSink extends Sink.ChainedReference<T, T> implements DropWhileSink<T> {
long dropCount;
boolean take;
OpSink() {
super(sink);
}
@Override
public void accept(T t) {
boolean takeElement = take || (take = !predicate.test(t));
// If ordered and element is dropped increment index
// for possible future truncation
if (retainAndCountDroppedElements && !takeElement)
dropCount++;
// If ordered need to process element, otherwise
// skip if element is dropped
if (retainAndCountDroppedElements || takeElement)
downstream.accept(t);
}
@Override
public long getDropCount() {
return dropCount;
}
}
return new OpSink();
}
}
return new Op(upstream, StreamShape.REFERENCE, DROP_FLAGS);
}
Appends a "dropWhile" operation to the provided IntStream.
Params: - upstream – a reference stream with element type T
- predicate – the predicate that returns false to halt dropping.
/**
* Appends a "dropWhile" operation to the provided IntStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt dropping.
*/
static IntStream makeDropWhileInt(AbstractPipeline<?, Integer, ?> upstream,
IntPredicate predicate) {
Objects.requireNonNull(predicate);
class Op extends IntPipeline.StatefulOp<Integer> implements DropWhileOp<Integer> {
public Op(AbstractPipeline<?, Integer, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, inputShape, opFlags);
}
@Override
<P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Integer[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfInt.Dropping(
(Spliterator.OfInt) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
Spliterator<P_IN> spliterator,
IntFunction<Integer[]> generator) {
return new DropWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
return opWrapSink(sink, false);
}
public DropWhileSink<Integer> opWrapSink(Sink<Integer> sink, boolean retainAndCountDroppedElements) {
class OpSink extends Sink.ChainedInt<Integer> implements DropWhileSink<Integer> {
long dropCount;
boolean take;
OpSink() {
super(sink);
}
@Override
public void accept(int t) {
boolean takeElement = take || (take = !predicate.test(t));
// If ordered and element is dropped increment index
// for possible future truncation
if (retainAndCountDroppedElements && !takeElement)
dropCount++;
// If ordered need to process element, otherwise
// skip if element is dropped
if (retainAndCountDroppedElements || takeElement)
downstream.accept(t);
}
@Override
public long getDropCount() {
return dropCount;
}
}
return new OpSink();
}
}
return new Op(upstream, StreamShape.INT_VALUE, DROP_FLAGS);
}
Appends a "dropWhile" operation to the provided LongStream.
Params: - upstream – a reference stream with element type T
- predicate – the predicate that returns false to halt dropping.
/**
* Appends a "dropWhile" operation to the provided LongStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt dropping.
*/
static LongStream makeDropWhileLong(AbstractPipeline<?, Long, ?> upstream,
LongPredicate predicate) {
Objects.requireNonNull(predicate);
class Op extends LongPipeline.StatefulOp<Long> implements DropWhileOp<Long> {
public Op(AbstractPipeline<?, Long, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, inputShape, opFlags);
}
@Override
<P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Long[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfLong.Dropping(
(Spliterator.OfLong) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
Spliterator<P_IN> spliterator,
IntFunction<Long[]> generator) {
return new DropWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
return opWrapSink(sink, false);
}
public DropWhileSink<Long> opWrapSink(Sink<Long> sink, boolean retainAndCountDroppedElements) {
class OpSink extends Sink.ChainedLong<Long> implements DropWhileSink<Long> {
long dropCount;
boolean take;
OpSink() {
super(sink);
}
@Override
public void accept(long t) {
boolean takeElement = take || (take = !predicate.test(t));
// If ordered and element is dropped increment index
// for possible future truncation
if (retainAndCountDroppedElements && !takeElement)
dropCount++;
// If ordered need to process element, otherwise
// skip if element is dropped
if (retainAndCountDroppedElements || takeElement)
downstream.accept(t);
}
@Override
public long getDropCount() {
return dropCount;
}
}
return new OpSink();
}
}
return new Op(upstream, StreamShape.LONG_VALUE, DROP_FLAGS);
}
Appends a "dropWhile" operation to the provided DoubleStream.
Params: - upstream – a reference stream with element type T
- predicate – the predicate that returns false to halt dropping.
/**
* Appends a "dropWhile" operation to the provided DoubleStream.
*
* @param upstream a reference stream with element type T
* @param predicate the predicate that returns false to halt dropping.
*/
static DoubleStream makeDropWhileDouble(AbstractPipeline<?, Double, ?> upstream,
DoublePredicate predicate) {
Objects.requireNonNull(predicate);
class Op extends DoublePipeline.StatefulOp<Double> implements DropWhileOp<Double> {
public Op(AbstractPipeline<?, Double, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, inputShape, opFlags);
}
@Override
<P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator) {
if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
return opEvaluateParallel(helper, spliterator, Double[]::new)
.spliterator();
}
else {
return new UnorderedWhileSpliterator.OfDouble.Dropping(
(Spliterator.OfDouble) helper.wrapSpliterator(spliterator), false, predicate);
}
}
@Override
<P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
Spliterator<P_IN> spliterator,
IntFunction<Double[]> generator) {
return new DropWhileTask<>(this, helper, spliterator, generator)
.invoke();
}
@Override
Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
return opWrapSink(sink, false);
}
public DropWhileSink<Double> opWrapSink(Sink<Double> sink, boolean retainAndCountDroppedElements) {
class OpSink extends Sink.ChainedDouble<Double> implements DropWhileSink<Double> {
long dropCount;
boolean take;
OpSink() {
super(sink);
}
@Override
public void accept(double t) {
boolean takeElement = take || (take = !predicate.test(t));
// If ordered and element is dropped increment index
// for possible future truncation
if (retainAndCountDroppedElements && !takeElement)
dropCount++;
// If ordered need to process element, otherwise
// skip if element is dropped
if (retainAndCountDroppedElements || takeElement)
downstream.accept(t);
}
@Override
public long getDropCount() {
return dropCount;
}
}
return new OpSink();
}
}
return new Op(upstream, StreamShape.DOUBLE_VALUE, DROP_FLAGS);
}
//
A spliterator supporting takeWhile and dropWhile operations over an
underlying spliterator whose covered elements have no encounter order.
Concrete subclasses of this spliterator support reference and primitive
types for takeWhile and dropWhile.
For the takeWhile operation if during traversal taking completes then taking is cancelled globally for the splitting and traversal of all related spliterators. Cancellation is governed by a shared AtomicBoolean
instance. A spliterator in the process of taking when cancellation occurs will also be cancelled but not necessarily immediately. To reduce contention on the AtomicBoolean
instance, cancellation make be acted on after a small number of additional elements have been traversed.
For the dropWhile operation if during traversal dropping completes for
some, but not all elements, then it is cancelled globally for the
traversal of all related spliterators (splitting is not cancelled).
Cancellation is governed in the same manner as for the takeWhile
operation.
Type parameters:
/**
* A spliterator supporting takeWhile and dropWhile operations over an
* underlying spliterator whose covered elements have no encounter order.
* <p>
* Concrete subclasses of this spliterator support reference and primitive
* types for takeWhile and dropWhile.
* <p>
* For the takeWhile operation if during traversal taking completes then
* taking is cancelled globally for the splitting and traversal of all
* related spliterators.
* Cancellation is governed by a shared {@link AtomicBoolean} instance. A
* spliterator in the process of taking when cancellation occurs will also
* be cancelled but not necessarily immediately. To reduce contention on
* the {@link AtomicBoolean} instance, cancellation make be acted on after
* a small number of additional elements have been traversed.
* <p>
* For the dropWhile operation if during traversal dropping completes for
* some, but not all elements, then it is cancelled globally for the
* traversal of all related spliterators (splitting is not cancelled).
* Cancellation is governed in the same manner as for the takeWhile
* operation.
*
* @param <T> the type of elements returned by this spliterator
* @param <T_SPLITR> the type of the spliterator
*/
abstract static class UnorderedWhileSpliterator<T, T_SPLITR extends Spliterator<T>> implements Spliterator<T> {
// Power of two constant minus one used for modulus of count
static final int CANCEL_CHECK_COUNT = (1 << 6) - 1;
// The underlying spliterator
final T_SPLITR s;
// True if no splitting should be performed, if true then
// this spliterator may be used for an underlying spliterator whose
// covered elements have an encounter order
// See use in stream take/dropWhile default default methods
final boolean noSplitting;
// True when operations are cancelled for all related spliterators
// For taking, spliterators cannot split or traversed
// For dropping, spliterators cannot be traversed
final AtomicBoolean cancel;
// True while taking or dropping should be performed when traversing
boolean takeOrDrop = true;
// The count of elements traversed
int count;
UnorderedWhileSpliterator(T_SPLITR s, boolean noSplitting) {
this.s = s;
this.noSplitting = noSplitting;
this.cancel = new AtomicBoolean();
}
UnorderedWhileSpliterator(T_SPLITR s, UnorderedWhileSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.noSplitting = parent.noSplitting;
this.cancel = parent.cancel;
}
@Override
public long estimateSize() {
return s.estimateSize();
}
@Override
public int characteristics() {
// Size is not known
return s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED);
}
@Override
public long getExactSizeIfKnown() {
return -1L;
}
@Override
public Comparator<? super T> getComparator() {
return s.getComparator();
}
@Override
public T_SPLITR trySplit() {
@SuppressWarnings("unchecked")
T_SPLITR ls = noSplitting ? null : (T_SPLITR) s.trySplit();
return ls != null ? makeSpliterator(ls) : null;
}
boolean checkCancelOnCount() {
return count != 0 || !cancel.get();
}
abstract T_SPLITR makeSpliterator(T_SPLITR s);
abstract static class OfRef<T> extends UnorderedWhileSpliterator<T, Spliterator<T>> implements Consumer<T> {
final Predicate<? super T> p;
T t;
OfRef(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
super(s, noSplitting);
this.p = p;
}
OfRef(Spliterator<T> s, OfRef<T> parent) {
super(s, parent);
this.p = parent.p;
}
@Override
public void accept(T t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
static final class Taking<T> extends OfRef<T> {
Taking(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
super(s, noSplitting, p);
}
Taking(Spliterator<T> s, Taking<T> parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
boolean test = true;
if (takeOrDrop && // If can take
checkCancelOnCount() && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
}
else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Spliterator<T> trySplit() {
// Do not split if all operations are cancelled
return cancel.get() ? null : super.trySplit();
}
@Override
Spliterator<T> makeSpliterator(Spliterator<T> s) {
return new Taking<>(s, this);
}
}
static final class Dropping<T> extends OfRef<T> {
Dropping(Spliterator<T> s, boolean noSplitting, Predicate<? super T> p) {
super(s, noSplitting, p);
}
Dropping(Spliterator<T> s, Dropping<T> parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (takeOrDrop) {
takeOrDrop = false;
boolean adv;
boolean dropped = false;
while ((adv = s.tryAdvance(this)) && // If advanced one element
checkCancelOnCount() && // and if not cancelled
p.test(t)) { // and test on element passes
dropped = true; // then drop element
}
// Report advanced element, if any
if (adv) {
// Cancel all further dropping if one or more elements
// were previously dropped
if (dropped)
cancel.set(true);
action.accept(t);
}
return adv;
}
else {
return s.tryAdvance(action);
}
}
@Override
Spliterator<T> makeSpliterator(Spliterator<T> s) {
return new Dropping<>(s, this);
}
}
}
abstract static class OfInt extends UnorderedWhileSpliterator<Integer, Spliterator.OfInt> implements IntConsumer, Spliterator.OfInt {
final IntPredicate p;
int t;
OfInt(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
super(s, noSplitting);
this.p = p;
}
OfInt(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
super(s, parent);
this.p = parent.p;
}
@Override
public void accept(int t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
static final class Taking extends UnorderedWhileSpliterator.OfInt {
Taking(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
super(s, noSplitting, p);
}
Taking(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(IntConsumer action) {
boolean test = true;
if (takeOrDrop && // If can take
checkCancelOnCount() && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
}
else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Spliterator.OfInt trySplit() {
// Do not split if all operations are cancelled
return cancel.get() ? null : super.trySplit();
}
@Override
Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
return new Taking(s, this);
}
}
static final class Dropping extends UnorderedWhileSpliterator.OfInt {
Dropping(Spliterator.OfInt s, boolean noSplitting, IntPredicate p) {
super(s, noSplitting, p);
}
Dropping(Spliterator.OfInt s, UnorderedWhileSpliterator.OfInt parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(IntConsumer action) {
if (takeOrDrop) {
takeOrDrop = false;
boolean adv;
boolean dropped = false;
while ((adv = s.tryAdvance(this)) && // If advanced one element
checkCancelOnCount() && // and if not cancelled
p.test(t)) { // and test on element passes
dropped = true; // then drop element
}
// Report advanced element, if any
if (adv) {
// Cancel all further dropping if one or more elements
// were previously dropped
if (dropped)
cancel.set(true);
action.accept(t);
}
return adv;
}
else {
return s.tryAdvance(action);
}
}
@Override
Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
return new Dropping(s, this);
}
}
}
abstract static class OfLong extends UnorderedWhileSpliterator<Long, Spliterator.OfLong> implements LongConsumer, Spliterator.OfLong {
final LongPredicate p;
long t;
OfLong(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
super(s, noSplitting);
this.p = p;
}
OfLong(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
super(s, parent);
this.p = parent.p;
}
@Override
public void accept(long t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
static final class Taking extends UnorderedWhileSpliterator.OfLong {
Taking(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
super(s, noSplitting, p);
}
Taking(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(LongConsumer action) {
boolean test = true;
if (takeOrDrop && // If can take
checkCancelOnCount() && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
}
else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Spliterator.OfLong trySplit() {
// Do not split if all operations are cancelled
return cancel.get() ? null : super.trySplit();
}
@Override
Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
return new Taking(s, this);
}
}
static final class Dropping extends UnorderedWhileSpliterator.OfLong {
Dropping(Spliterator.OfLong s, boolean noSplitting, LongPredicate p) {
super(s, noSplitting, p);
}
Dropping(Spliterator.OfLong s, UnorderedWhileSpliterator.OfLong parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(LongConsumer action) {
if (takeOrDrop) {
takeOrDrop = false;
boolean adv;
boolean dropped = false;
while ((adv = s.tryAdvance(this)) && // If advanced one element
checkCancelOnCount() && // and if not cancelled
p.test(t)) { // and test on element passes
dropped = true; // then drop element
}
// Report advanced element, if any
if (adv) {
// Cancel all further dropping if one or more elements
// were previously dropped
if (dropped)
cancel.set(true);
action.accept(t);
}
return adv;
}
else {
return s.tryAdvance(action);
}
}
@Override
Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
return new Dropping(s, this);
}
}
}
abstract static class OfDouble extends UnorderedWhileSpliterator<Double, Spliterator.OfDouble> implements DoubleConsumer, Spliterator.OfDouble {
final DoublePredicate p;
double t;
OfDouble(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
super(s, noSplitting);
this.p = p;
}
OfDouble(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
super(s, parent);
this.p = parent.p;
}
@Override
public void accept(double t) {
count = (count + 1) & CANCEL_CHECK_COUNT;
this.t = t;
}
static final class Taking extends UnorderedWhileSpliterator.OfDouble {
Taking(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
super(s, noSplitting, p);
}
Taking(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(DoubleConsumer action) {
boolean test = true;
if (takeOrDrop && // If can take
checkCancelOnCount() && // and if not cancelled
s.tryAdvance(this) && // and if advanced one element
(test = p.test(t))) { // and test on element passes
action.accept(t); // then accept element
return true;
}
else {
// Taking is finished
takeOrDrop = false;
// Cancel all further traversal and splitting operations
// only if test of element failed (short-circuited)
if (!test)
cancel.set(true);
return false;
}
}
@Override
public Spliterator.OfDouble trySplit() {
// Do not split if all operations are cancelled
return cancel.get() ? null : super.trySplit();
}
@Override
Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
return new Taking(s, this);
}
}
static final class Dropping extends UnorderedWhileSpliterator.OfDouble {
Dropping(Spliterator.OfDouble s, boolean noSplitting, DoublePredicate p) {
super(s, noSplitting, p);
}
Dropping(Spliterator.OfDouble s, UnorderedWhileSpliterator.OfDouble parent) {
super(s, parent);
}
@Override
public boolean tryAdvance(DoubleConsumer action) {
if (takeOrDrop) {
takeOrDrop = false;
boolean adv;
boolean dropped = false;
while ((adv = s.tryAdvance(this)) && // If advanced one element
checkCancelOnCount() && // and if not cancelled
p.test(t)) { // and test on element passes
dropped = true; // then drop element
}
// Report advanced element, if any
if (adv) {
// Cancel all further dropping if one or more elements
// were previously dropped
if (dropped)
cancel.set(true);
action.accept(t);
}
return adv;
}
else {
return s.tryAdvance(action);
}
}
@Override
Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
return new Dropping(s, this);
}
}
}
}
//
ForkJoinTask
implementing takeWhile computation.
If the pipeline has encounter order then all tasks to the right of
a task where traversal was short-circuited are cancelled.
The results of completed (and cancelled) tasks are discarded.
The result of merging a short-circuited left task and right task (which
may or may not be short-circuited) is that left task.
If the pipeline has no encounter order then all tasks to the right of
a task where traversal was short-circuited are cancelled.
The results of completed (and possibly cancelled) tasks are not
discarded, as there is no need to throw away computed results.
The result of merging does not change if a left task was
short-circuited.
No attempt is made, once a leaf task stopped taking, for it to cancel
all other tasks, and further more, short-circuit the computation with its
result.
Type parameters:
/**
* {@code ForkJoinTask} implementing takeWhile computation.
* <p>
* If the pipeline has encounter order then all tasks to the right of
* a task where traversal was short-circuited are cancelled.
* The results of completed (and cancelled) tasks are discarded.
* The result of merging a short-circuited left task and right task (which
* may or may not be short-circuited) is that left task.
* <p>
* If the pipeline has no encounter order then all tasks to the right of
* a task where traversal was short-circuited are cancelled.
* The results of completed (and possibly cancelled) tasks are not
* discarded, as there is no need to throw away computed results.
* The result of merging does not change if a left task was
* short-circuited.
* No attempt is made, once a leaf task stopped taking, for it to cancel
* all other tasks, and further more, short-circuit the computation with its
* result.
*
* @param <P_IN> Input element type to the stream pipeline
* @param <P_OUT> Output element type from the stream pipeline
*/
@SuppressWarnings("serial")
private static final class TakeWhileTask<P_IN, P_OUT>
extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, TakeWhileTask<P_IN, P_OUT>> {
private final AbstractPipeline<P_OUT, P_OUT, ?> op;
private final IntFunction<P_OUT[]> generator;
private final boolean isOrdered;
private long thisNodeSize;
// True if a short-circuited
private boolean shortCircuited;
// True if completed, must be set after the local result
private volatile boolean completed;
TakeWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<P_OUT[]> generator) {
super(helper, spliterator);
this.op = op;
this.generator = generator;
this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags());
}
TakeWhileTask(TakeWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
super(parent, spliterator);
this.op = parent.op;
this.generator = parent.generator;
this.isOrdered = parent.isOrdered;
}
@Override
protected TakeWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
return new TakeWhileTask<>(this, spliterator);
}
@Override
protected final Node<P_OUT> getEmptyResult() {
return Nodes.emptyNode(op.getOutputShape());
}
@Override
protected final Node<P_OUT> doLeaf() {
Node.Builder<P_OUT> builder = helper.makeNodeBuilder(-1, generator);
Sink<P_OUT> s = op.opWrapSink(helper.getStreamAndOpFlags(), builder);
if (shortCircuited = helper.copyIntoWithCancel(helper.wrapSink(s), spliterator)) {
// Cancel later nodes if the predicate returned false
// during traversal
cancelLaterNodes();
}
Node<P_OUT> node = builder.build();
thisNodeSize = node.count();
return node;
}
@Override
public final void onCompletion(CountedCompleter<?> caller) {
if (!isLeaf()) {
Node<P_OUT> result;
shortCircuited = leftChild.shortCircuited | rightChild.shortCircuited;
if (isOrdered && canceled) {
thisNodeSize = 0;
result = getEmptyResult();
}
else if (isOrdered && leftChild.shortCircuited) {
// If taking finished on the left node then
// use the left node result
thisNodeSize = leftChild.thisNodeSize;
result = leftChild.getLocalResult();
}
else {
thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
result = merge();
}
setLocalResult(result);
}
completed = true;
super.onCompletion(caller);
}
Node<P_OUT> merge() {
if (leftChild.thisNodeSize == 0) {
// If the left node size is 0 then
// use the right node result
return rightChild.getLocalResult();
}
else if (rightChild.thisNodeSize == 0) {
// If the right node size is 0 then
// use the left node result
return leftChild.getLocalResult();
}
else {
// Combine the left and right nodes
return Nodes.conc(op.getOutputShape(),
leftChild.getLocalResult(), rightChild.getLocalResult());
}
}
@Override
protected void cancel() {
super.cancel();
if (isOrdered && completed)
// If the task is completed then clear the result, if any
// to aid GC
setLocalResult(getEmptyResult());
}
}
ForkJoinTask
implementing dropWhile computation.
If the pipeline has encounter order then each leaf task will not
drop elements but will obtain a count of the elements that would have
been otherwise dropped. That count is used as an index to track
elements to be dropped. Merging will update the index so it corresponds
to the index that is the end of the global prefix of elements to be
dropped. The root is truncated according to that index.
If the pipeline has no encounter order then each leaf task will drop
elements. Leaf tasks are ordinarily merged. No truncation of the root
node is required.
No attempt is made, once a leaf task stopped dropping, for it to cancel
all other tasks, and further more, short-circuit the computation with
its result.
Type parameters:
/**
* {@code ForkJoinTask} implementing dropWhile computation.
* <p>
* If the pipeline has encounter order then each leaf task will not
* drop elements but will obtain a count of the elements that would have
* been otherwise dropped. That count is used as an index to track
* elements to be dropped. Merging will update the index so it corresponds
* to the index that is the end of the global prefix of elements to be
* dropped. The root is truncated according to that index.
* <p>
* If the pipeline has no encounter order then each leaf task will drop
* elements. Leaf tasks are ordinarily merged. No truncation of the root
* node is required.
* No attempt is made, once a leaf task stopped dropping, for it to cancel
* all other tasks, and further more, short-circuit the computation with
* its result.
*
* @param <P_IN> Input element type to the stream pipeline
* @param <P_OUT> Output element type from the stream pipeline
*/
@SuppressWarnings("serial")
private static final class DropWhileTask<P_IN, P_OUT>
extends AbstractTask<P_IN, P_OUT, Node<P_OUT>, DropWhileTask<P_IN, P_OUT>> {
private final AbstractPipeline<P_OUT, P_OUT, ?> op;
private final IntFunction<P_OUT[]> generator;
private final boolean isOrdered;
private long thisNodeSize;
// The index from which elements of the node should be taken
// i.e. the node should be truncated from [takeIndex, thisNodeSize)
// Equivalent to the count of dropped elements
private long index;
DropWhileTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
PipelineHelper<P_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<P_OUT[]> generator) {
super(helper, spliterator);
assert op instanceof DropWhileOp;
this.op = op;
this.generator = generator;
this.isOrdered = StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags());
}
DropWhileTask(DropWhileTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
super(parent, spliterator);
this.op = parent.op;
this.generator = parent.generator;
this.isOrdered = parent.isOrdered;
}
@Override
protected DropWhileTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
return new DropWhileTask<>(this, spliterator);
}
@Override
protected final Node<P_OUT> doLeaf() {
boolean isChild = !isRoot();
// If this not the root and pipeline is ordered and size is known
// then pre-size the builder
long sizeIfKnown = isChild && isOrdered && StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
? op.exactOutputSizeIfKnown(spliterator)
: -1;
Node.Builder<P_OUT> builder = helper.makeNodeBuilder(sizeIfKnown, generator);
@SuppressWarnings("unchecked")
DropWhileOp<P_OUT> dropOp = (DropWhileOp<P_OUT>) op;
// If this leaf is the root then there is no merging on completion
// and there is no need to retain dropped elements
DropWhileSink<P_OUT> s = dropOp.opWrapSink(builder, isOrdered && isChild);
helper.wrapAndCopyInto(s, spliterator);
Node<P_OUT> node = builder.build();
thisNodeSize = node.count();
index = s.getDropCount();
return node;
}
@Override
public final void onCompletion(CountedCompleter<?> caller) {
if (!isLeaf()) {
if (isOrdered) {
index = leftChild.index;
// If a contiguous sequence of dropped elements
// include those of the right node, if any
if (index == leftChild.thisNodeSize)
index += rightChild.index;
}
thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
Node<P_OUT> result = merge();
setLocalResult(isRoot() ? doTruncate(result) : result);
}
super.onCompletion(caller);
}
private Node<P_OUT> merge() {
if (leftChild.thisNodeSize == 0) {
// If the left node size is 0 then
// use the right node result
return rightChild.getLocalResult();
}
else if (rightChild.thisNodeSize == 0) {
// If the right node size is 0 then
// use the left node result
return leftChild.getLocalResult();
}
else {
// Combine the left and right nodes
return Nodes.conc(op.getOutputShape(),
leftChild.getLocalResult(), rightChild.getLocalResult());
}
}
private Node<P_OUT> doTruncate(Node<P_OUT> input) {
return isOrdered
? input.truncate(index, input.count(), generator)
: input;
}
}
}