/*
 * Copyright (c) 2012, 2016, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package java.util.stream;

import java.util.Objects;
import java.util.Spliterator;
import java.util.function.IntFunction;
import java.util.function.Supplier;

Abstract base class for "pipeline" classes, which are the core implementations of the Stream interface and its primitive specializations. Manages construction and evaluation of stream pipelines.

An AbstractPipeline represents an initial portion of a stream pipeline, encapsulating a stream source and zero or more intermediate operations. The individual AbstractPipeline objects are often referred to as stages, where each stage describes either the stream source or an intermediate operation.

A concrete intermediate stage is generally built from an AbstractPipeline, a shape-specific pipeline class which extends it (e.g., IntPipeline) which is also abstract, and an operation-specific concrete class which extends that. AbstractPipeline contains most of the mechanics of evaluating the pipeline, and implements methods that will be used by the operation; the shape-specific classes add helper methods for dealing with collection of results into the appropriate shape-specific containers.

After chaining a new intermediate operation, or executing a terminal operation, the stream is considered to be consumed, and no more intermediate or terminal operations are permitted on this stream instance.

Type parameters:
  • <E_IN> – type of input elements
  • <E_OUT> – type of output elements
  • <S> – type of the subclass implementing BaseStream
Implementation Note:

For sequential streams, and parallel streams without stateful intermediate operations, parallel streams, pipeline evaluation is done in a single pass that "jams" all the operations together. For parallel streams with stateful operations, execution is divided into segments, where each stateful operations marks the end of a segment, and each segment is evaluated separately and the result used as the input to the next segment. In all cases, the source data is not consumed until a terminal operation begins.

Since:1.8
/** * Abstract base class for "pipeline" classes, which are the core * implementations of the Stream interface and its primitive specializations. * Manages construction and evaluation of stream pipelines. * * <p>An {@code AbstractPipeline} represents an initial portion of a stream * pipeline, encapsulating a stream source and zero or more intermediate * operations. The individual {@code AbstractPipeline} objects are often * referred to as <em>stages</em>, where each stage describes either the stream * source or an intermediate operation. * * <p>A concrete intermediate stage is generally built from an * {@code AbstractPipeline}, a shape-specific pipeline class which extends it * (e.g., {@code IntPipeline}) which is also abstract, and an operation-specific * concrete class which extends that. {@code AbstractPipeline} contains most of * the mechanics of evaluating the pipeline, and implements methods that will be * used by the operation; the shape-specific classes add helper methods for * dealing with collection of results into the appropriate shape-specific * containers. * * <p>After chaining a new intermediate operation, or executing a terminal * operation, the stream is considered to be consumed, and no more intermediate * or terminal operations are permitted on this stream instance. * * @implNote * <p>For sequential streams, and parallel streams without * <a href="package-summary.html#StreamOps">stateful intermediate * operations</a>, parallel streams, pipeline evaluation is done in a single * pass that "jams" all the operations together. For parallel streams with * stateful operations, execution is divided into segments, where each * stateful operations marks the end of a segment, and each segment is * evaluated separately and the result used as the input to the next * segment. In all cases, the source data is not consumed until a terminal * operation begins. * * @param <E_IN> type of input elements * @param <E_OUT> type of output elements * @param <S> type of the subclass implementing {@code BaseStream} * @since 1.8 */
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>> extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> { private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed"; private static final String MSG_CONSUMED = "source already consumed or closed";
Backlink to the head of the pipeline chain (self if this is the source stage).
/** * Backlink to the head of the pipeline chain (self if this is the source * stage). */
@SuppressWarnings("rawtypes") private final AbstractPipeline sourceStage;
The "upstream" pipeline, or null if this is the source stage.
/** * The "upstream" pipeline, or null if this is the source stage. */
@SuppressWarnings("rawtypes") private final AbstractPipeline previousStage;
The operation flags for the intermediate operation represented by this pipeline object.
/** * The operation flags for the intermediate operation represented by this * pipeline object. */
protected final int sourceOrOpFlags;
The next stage in the pipeline, or null if this is the last stage. Effectively final at the point of linking to the next pipeline.
/** * The next stage in the pipeline, or null if this is the last stage. * Effectively final at the point of linking to the next pipeline. */
@SuppressWarnings("rawtypes") private AbstractPipeline nextStage;
The number of intermediate operations between this pipeline object and the stream source if sequential, or the previous stateful if parallel. Valid at the point of pipeline preparation for evaluation.
/** * The number of intermediate operations between this pipeline object * and the stream source if sequential, or the previous stateful if parallel. * Valid at the point of pipeline preparation for evaluation. */
private int depth;
The combined source and operation flags for the source and all operations up to and including the operation represented by this pipeline object. Valid at the point of pipeline preparation for evaluation.
/** * The combined source and operation flags for the source and all operations * up to and including the operation represented by this pipeline object. * Valid at the point of pipeline preparation for evaluation. */
private int combinedFlags;
The source spliterator. Only valid for the head pipeline. Before the pipeline is consumed if non-null then sourceSupplier must be null. After the pipeline is consumed if non-null then is set to null.
/** * The source spliterator. Only valid for the head pipeline. * Before the pipeline is consumed if non-null then {@code sourceSupplier} * must be null. After the pipeline is consumed if non-null then is set to * null. */
private Spliterator<?> sourceSpliterator;
The source supplier. Only valid for the head pipeline. Before the pipeline is consumed if non-null then sourceSpliterator must be null. After the pipeline is consumed if non-null then is set to null.
/** * The source supplier. Only valid for the head pipeline. Before the * pipeline is consumed if non-null then {@code sourceSpliterator} must be * null. After the pipeline is consumed if non-null then is set to null. */
private Supplier<? extends Spliterator<?>> sourceSupplier;
True if this pipeline has been linked or consumed
/** * True if this pipeline has been linked or consumed */
private boolean linkedOrConsumed;
True if there are any stateful ops in the pipeline; only valid for the source stage.
/** * True if there are any stateful ops in the pipeline; only valid for the * source stage. */
private boolean sourceAnyStateful; private Runnable sourceCloseAction;
True if pipeline is parallel, otherwise the pipeline is sequential; only valid for the source stage.
/** * True if pipeline is parallel, otherwise the pipeline is sequential; only * valid for the source stage. */
private boolean parallel;
Constructor for the head of a stream pipeline.
Params:
  • source – Supplier<Spliterator> describing the stream source
  • sourceFlags – The source flags for the stream source, described in StreamOpFlag
  • parallel – True if the pipeline is parallel
/** * Constructor for the head of a stream pipeline. * * @param source {@code Supplier<Spliterator>} describing the stream source * @param sourceFlags The source flags for the stream source, described in * {@link StreamOpFlag} * @param parallel True if the pipeline is parallel */
AbstractPipeline(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSupplier = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; }
Constructor for the head of a stream pipeline.
Params:
  • source – Spliterator describing the stream source
  • sourceFlags – the source flags for the stream source, described in StreamOpFlag
  • parallel – true if the pipeline is parallel
/** * Constructor for the head of a stream pipeline. * * @param source {@code Spliterator} describing the stream source * @param sourceFlags the source flags for the stream source, described in * {@link StreamOpFlag} * @param parallel {@code true} if the pipeline is parallel */
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; }
Constructor for appending an intermediate operation stage onto an existing pipeline.
Params:
  • previousStage – the upstream pipeline stage
  • opFlags – the operation flags for the new stage, described in StreamOpFlag
/** * Constructor for appending an intermediate operation stage onto an * existing pipeline. * * @param previousStage the upstream pipeline stage * @param opFlags the operation flags for the new stage, described in * {@link StreamOpFlag} */
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; } // Terminal evaluation methods
Evaluate the pipeline with a terminal operation to produce a result.
Params:
  • terminalOp – the terminal operation to be applied to the pipeline.
Type parameters:
  • <R> – the type of result
Returns:the result
/** * Evaluate the pipeline with a terminal operation to produce a result. * * @param <R> the type of result * @param terminalOp the terminal operation to be applied to the pipeline. * @return the result */
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
Collect the elements output from the pipeline stage.
Params:
  • generator – the array generator to be used to create array instances
Returns:a flat array-backed Node that holds the collected output elements
/** * Collect the elements output from the pipeline stage. * * @param generator the array generator to be used to create array instances * @return a flat array-backed Node that holds the collected output elements */
@SuppressWarnings("unchecked") final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; // If the last intermediate operation is stateful then // evaluate directly to avoid an extra collection step if (isParallel() && previousStage != null && opIsStateful()) { // Set the depth of this, last, pipeline stage to zero to slice the // pipeline such that this operation will not be included in the // upstream slice and upstream operations will not be included // in this slice depth = 0; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator); } else { return evaluate(sourceSpliterator(0), true, generator); } }
Gets the source stage spliterator if this pipeline stage is the source stage. The pipeline is consumed after this method is called and returns successfully.
Throws:
Returns:the source stage spliterator
/** * Gets the source stage spliterator if this pipeline stage is the source * stage. The pipeline is consumed after this method is called and * returns successfully. * * @return the source stage spliterator * @throws IllegalStateException if this pipeline stage is not the source * stage. */
@SuppressWarnings("unchecked") final Spliterator<E_OUT> sourceStageSpliterator() { if (this != sourceStage) throw new IllegalStateException(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (sourceStage.sourceSpliterator != null) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null; return s; } else if (sourceStage.sourceSupplier != null) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; return s; } else { throw new IllegalStateException(MSG_CONSUMED); } } // BaseStream @Override @SuppressWarnings("unchecked") public final S sequential() { sourceStage.parallel = false; return (S) this; } @Override @SuppressWarnings("unchecked") public final S parallel() { sourceStage.parallel = true; return (S) this; } @Override public void close() { linkedOrConsumed = true; sourceSupplier = null; sourceSpliterator = null; if (sourceStage.sourceCloseAction != null) { Runnable closeAction = sourceStage.sourceCloseAction; sourceStage.sourceCloseAction = null; closeAction.run(); } } @Override @SuppressWarnings("unchecked") public S onClose(Runnable closeHandler) { if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); Objects.requireNonNull(closeHandler); Runnable existingHandler = sourceStage.sourceCloseAction; sourceStage.sourceCloseAction = (existingHandler == null) ? closeHandler : Streams.composeWithExceptions(existingHandler, closeHandler); return (S) this; } // Primitive specialization use co-variant overrides, hence is not final @Override @SuppressWarnings("unchecked") public Spliterator<E_OUT> spliterator() { if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (this == sourceStage) { if (sourceStage.sourceSpliterator != null) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null; return s; } else if (sourceStage.sourceSupplier != null) { @SuppressWarnings("unchecked") Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier; sourceStage.sourceSupplier = null; return lazySpliterator(s); } else { throw new IllegalStateException(MSG_CONSUMED); } } else { return wrap(this, () -> sourceSpliterator(0), isParallel()); } } @Override public final boolean isParallel() { return sourceStage.parallel; }
Returns the composition of stream flags of the stream source and all intermediate operations.
See Also:
Returns:the composition of stream flags of the stream source and all intermediate operations
/** * Returns the composition of stream flags of the stream source and all * intermediate operations. * * @return the composition of stream flags of the stream source and all * intermediate operations * @see StreamOpFlag */
final int getStreamFlags() { return StreamOpFlag.toStreamFlags(combinedFlags); }
Get the source spliterator for this pipeline stage. For a sequential or stateless parallel pipeline, this is the source spliterator. For a stateful parallel pipeline, this is a spliterator describing the results of all computations up to and including the most recent stateful operation.
/** * Get the source spliterator for this pipeline stage. For a sequential or * stateless parallel pipeline, this is the source spliterator. For a * stateful parallel pipeline, this is a spliterator describing the results * of all computations up to and including the most recent stateful * operation. */
@SuppressWarnings("unchecked") private Spliterator<?> sourceSpliterator(int terminalFlags) { // Get the source spliterator of the pipeline Spliterator<?> spliterator = null; if (sourceStage.sourceSpliterator != null) { spliterator = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null; } else if (sourceStage.sourceSupplier != null) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null; } else { throw new IllegalStateException(MSG_CONSUMED); } if (isParallel() && sourceStage.sourceAnyStateful) { // Adapt the source spliterator, evaluating each stateful op // in the pipeline up to and including this pipeline stage. // The depth and flags of each pipeline stage are adjusted accordingly. int depth = 1; for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this; u != e; u = p, p = p.nextStage) { int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { depth = 0; if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { // Clear the short circuit flag for next pipeline stage // This stage encapsulates short-circuiting, the next // stage may not have any short-circuit operations, and // if so spliterator.forEachRemaining should be used // for traversal thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; } spliterator = p.opEvaluateParallelLazy(u, spliterator); // Inject or clear SIZED on the source pipeline stage // based on the stage's spliterator thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } p.depth = depth++; p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } if (terminalFlags != 0) { // Apply flags from the terminal operation to last pipeline stage combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; } // PipelineHelper @Override final StreamShape getSourceShape() { @SuppressWarnings("rawtypes") AbstractPipeline p = AbstractPipeline.this; while (p.depth > 0) { p = p.previousStage; } return p.getOutputShape(); } @Override final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator) { return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1; } @Override final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; } @Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } } @Override @SuppressWarnings("unchecked") final <P_IN> boolean copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { @SuppressWarnings({"rawtypes","unchecked"}) AbstractPipeline p = AbstractPipeline.this; while (p.depth > 0) { p = p.previousStage; } wrappedSink.begin(spliterator.getExactSizeIfKnown()); boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink); wrappedSink.end(); return cancelled; } @Override final int getStreamAndOpFlags() { return combinedFlags; } final boolean isOrdered() { return StreamOpFlag.ORDERED.isKnown(combinedFlags); } @Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; } @Override @SuppressWarnings("unchecked") final <P_IN> Spliterator<E_OUT> wrapSpliterator(Spliterator<P_IN> sourceSpliterator) { if (depth == 0) { return (Spliterator<E_OUT>) sourceSpliterator; } else { return wrap(this, () -> sourceSpliterator, isParallel()); } } @Override @SuppressWarnings("unchecked") final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) { if (isParallel()) { // @@@ Optimize if op of this pipeline stage is a stateful op return evaluateToNode(this, spliterator, flatten, generator); } else { Node.Builder<E_OUT> nb = makeNodeBuilder( exactOutputSizeIfKnown(spliterator), generator); return wrapAndCopyInto(nb, spliterator).build(); } } // Shape-specific abstract methods, implemented by XxxPipeline classes
Get the output shape of the pipeline. If the pipeline is the head, then it's output shape corresponds to the shape of the source. Otherwise, it's output shape corresponds to the output shape of the associated operation.
Returns:the output shape
/** * Get the output shape of the pipeline. If the pipeline is the head, * then it's output shape corresponds to the shape of the source. * Otherwise, it's output shape corresponds to the output shape of the * associated operation. * * @return the output shape */
abstract StreamShape getOutputShape();
Collect elements output from a pipeline into a Node that holds elements of this shape.
Params:
  • helper – the pipeline helper describing the pipeline stages
  • spliterator – the source spliterator
  • flattenTree – true if the returned node should be flattened
  • generator – the array generator
Returns:a Node holding the output of the pipeline
/** * Collect elements output from a pipeline into a Node that holds elements * of this shape. * * @param helper the pipeline helper describing the pipeline stages * @param spliterator the source spliterator * @param flattenTree true if the returned node should be flattened * @param generator the array generator * @return a Node holding the output of the pipeline */
abstract <P_IN> Node<E_OUT> evaluateToNode(PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<E_OUT[]> generator);
Create a spliterator that wraps a source spliterator, compatible with this stream shape, and operations associated with a PipelineHelper.
Params:
  • ph – the pipeline helper describing the pipeline stages
  • supplier – the supplier of a spliterator
Returns:a wrapping spliterator compatible with this shape
/** * Create a spliterator that wraps a source spliterator, compatible with * this stream shape, and operations associated with a {@link * PipelineHelper}. * * @param ph the pipeline helper describing the pipeline stages * @param supplier the supplier of a spliterator * @return a wrapping spliterator compatible with this shape */
abstract <P_IN> Spliterator<E_OUT> wrap(PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel);
Create a lazy spliterator that wraps and obtains the supplied the spliterator when a method is invoked on the lazy spliterator.
Params:
  • supplier – the supplier of a spliterator
/** * Create a lazy spliterator that wraps and obtains the supplied the * spliterator when a method is invoked on the lazy spliterator. * @param supplier the supplier of a spliterator */
abstract Spliterator<E_OUT> lazySpliterator(Supplier<? extends Spliterator<E_OUT>> supplier);
Traverse the elements of a spliterator compatible with this stream shape, pushing those elements into a sink. If the sink requests cancellation, no further elements will be pulled or pushed.
Params:
  • spliterator – the spliterator to pull elements from
  • sink – the sink to push elements to
Returns:true if the cancellation was requested
/** * Traverse the elements of a spliterator compatible with this stream shape, * pushing those elements into a sink. If the sink requests cancellation, * no further elements will be pulled or pushed. * * @param spliterator the spliterator to pull elements from * @param sink the sink to push elements to * @return true if the cancellation was requested */
abstract boolean forEachWithCancel(Spliterator<E_OUT> spliterator, Sink<E_OUT> sink);
Make a node builder compatible with this stream shape.
Params:
  • exactSizeIfKnown – if >=0, then a node builder will be created that has a fixed capacity of at most sizeIfKnown elements. If < 0, then the node builder has an unfixed capacity. A fixed capacity node builder will throw exceptions if an element is added after builder has reached capacity, or is built before the builder has reached capacity.
  • generator – the array generator to be used to create instances of a T[] array. For implementations supporting primitive nodes, this parameter may be ignored.
Returns:a node builder
/** * Make a node builder compatible with this stream shape. * * @param exactSizeIfKnown if {@literal >=0}, then a node builder will be * created that has a fixed capacity of at most sizeIfKnown elements. If * {@literal < 0}, then the node builder has an unfixed capacity. A fixed * capacity node builder will throw exceptions if an element is added after * builder has reached capacity, or is built before the builder has reached * capacity. * * @param generator the array generator to be used to create instances of a * T[] array. For implementations supporting primitive nodes, this parameter * may be ignored. * @return a node builder */
@Override abstract Node.Builder<E_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<E_OUT[]> generator); // Op-specific abstract methods, implemented by the operation class
Returns whether this operation is stateful or not. If it is stateful, then the method opEvaluateParallel(PipelineHelper, Spliterator, IntFunction) must be overridden.
Returns:true if this operation is stateful
/** * Returns whether this operation is stateful or not. If it is stateful, * then the method * {@link #opEvaluateParallel(PipelineHelper, java.util.Spliterator, java.util.function.IntFunction)} * must be overridden. * * @return {@code true} if this operation is stateful */
abstract boolean opIsStateful();
Accepts a Sink which will receive the results of this operation, and return a Sink which accepts elements of the input type of this operation and which performs the operation, passing the results to the provided Sink.
Params:
  • flags – The combined stream and operation flags up to, but not including, this operation
  • sink – sink to which elements should be sent after processing
API Note: The implementation may use the flags parameter to optimize the sink wrapping. For example, if the input is already DISTINCT, the implementation for the Stream#distinct() method could just return the sink it was passed.
Returns:a sink which accepts elements, perform the operation upon each element, and passes the results (if any) to the provided Sink.
/** * Accepts a {@code Sink} which will receive the results of this operation, * and return a {@code Sink} which accepts elements of the input type of * this operation and which performs the operation, passing the results to * the provided {@code Sink}. * * @apiNote * The implementation may use the {@code flags} parameter to optimize the * sink wrapping. For example, if the input is already {@code DISTINCT}, * the implementation for the {@code Stream#distinct()} method could just * return the sink it was passed. * * @param flags The combined stream and operation flags up to, but not * including, this operation * @param sink sink to which elements should be sent after processing * @return a sink which accepts elements, perform the operation upon * each element, and passes the results (if any) to the provided * {@code Sink}. */
abstract Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink);
Performs a parallel evaluation of the operation using the specified PipelineHelper which describes the upstream intermediate operations. Only called on stateful operations. If opIsStateful() returns true then implementations must override the default implementation.
Params:
  • helper – the pipeline helper describing the pipeline stages
  • spliterator – the source Spliterator
  • generator – the array generator
Implementation Requirements:The default implementation always throw UnsupportedOperationException.
Returns:a Node describing the result of the evaluation
/** * Performs a parallel evaluation of the operation using the specified * {@code PipelineHelper} which describes the upstream intermediate * operations. Only called on stateful operations. If {@link * #opIsStateful()} returns true then implementations must override the * default implementation. * * @implSpec The default implementation always throw * {@code UnsupportedOperationException}. * * @param helper the pipeline helper describing the pipeline stages * @param spliterator the source {@code Spliterator} * @param generator the array generator * @return a {@code Node} describing the result of the evaluation */
<P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<E_OUT[]> generator) { throw new UnsupportedOperationException("Parallel evaluation is not supported"); }
Returns a Spliterator describing a parallel evaluation of the operation, using the specified PipelineHelper which describes the upstream intermediate operations. Only called on stateful operations. It is not necessary (though acceptable) to do a full computation of the result here; it is preferable, if possible, to describe the result via a lazily evaluated spliterator.
Params:
  • helper – the pipeline helper
  • spliterator – the source Spliterator
Implementation Requirements:The default implementation behaves as if:

    return evaluateParallel(helper, i -> (E_OUT[]) new
Object[i]).spliterator();
and is suitable for implementations that cannot do better than a full synchronous evaluation.
Returns:a Spliterator describing the result of the evaluation
/** * Returns a {@code Spliterator} describing a parallel evaluation of the * operation, using the specified {@code PipelineHelper} which describes the * upstream intermediate operations. Only called on stateful operations. * It is not necessary (though acceptable) to do a full computation of the * result here; it is preferable, if possible, to describe the result via a * lazily evaluated spliterator. * * @implSpec The default implementation behaves as if: * <pre>{@code * return evaluateParallel(helper, i -> (E_OUT[]) new * Object[i]).spliterator(); * }</pre> * and is suitable for implementations that cannot do better than a full * synchronous evaluation. * * @param helper the pipeline helper * @param spliterator the source {@code Spliterator} * @return a {@code Spliterator} describing the result of the evaluation */
@SuppressWarnings("unchecked") <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy(PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator) { return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object[i]).spliterator(); } }