/*
 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */
package java.util.stream;

import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicReference;

Abstract class for fork-join tasks used to implement short-circuiting stream ops, which can produce a result without processing all elements of the stream.
Type parameters:
  • <P_IN> – type of input elements to the pipeline
  • <P_OUT> – type of output elements from the pipeline
  • <R> – type of intermediate result, may be different from operation result type
  • <K> – type of child and sibling tasks
Since:1.8
/** * Abstract class for fork-join tasks used to implement short-circuiting * stream ops, which can produce a result without processing all elements of the * stream. * * @param <P_IN> type of input elements to the pipeline * @param <P_OUT> type of output elements from the pipeline * @param <R> type of intermediate result, may be different from operation * result type * @param <K> type of child and sibling tasks * @since 1.8 */
@SuppressWarnings("serial") abstract class AbstractShortCircuitTask<P_IN, P_OUT, R, K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>> extends AbstractTask<P_IN, P_OUT, R, K> {
The result for this computation; this is shared among all tasks and set exactly once
/** * The result for this computation; this is shared among all tasks and set * exactly once */
protected final AtomicReference<R> sharedResult;
Indicates whether this task has been canceled. Tasks may cancel other tasks in the computation under various conditions, such as in a find-first operation, a task that finds a value will cancel all tasks that are later in the encounter order.
/** * Indicates whether this task has been canceled. Tasks may cancel other * tasks in the computation under various conditions, such as in a * find-first operation, a task that finds a value will cancel all tasks * that are later in the encounter order. */
protected volatile boolean canceled;
Constructor for root tasks.
Params:
  • helper – the PipelineHelper describing the stream pipeline up to this operation
  • spliterator – the Spliterator describing the source for this pipeline
/** * Constructor for root tasks. * * @param helper the {@code PipelineHelper} describing the stream pipeline * up to this operation * @param spliterator the {@code Spliterator} describing the source for this * pipeline */
protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator) { super(helper, spliterator); sharedResult = new AtomicReference<>(null); }
Constructor for non-root nodes.
Params:
  • parent – parent task in the computation tree
  • spliterator – the Spliterator for the portion of the computation tree described by this task
/** * Constructor for non-root nodes. * * @param parent parent task in the computation tree * @param spliterator the {@code Spliterator} for the portion of the * computation tree described by this task */
protected AbstractShortCircuitTask(K parent, Spliterator<P_IN> spliterator) { super(parent, spliterator); sharedResult = parent.sharedResult; }
Returns the value indicating the computation completed with no task finding a short-circuitable result. For example, for a "find" operation, this might be null or an empty Optional.
Returns:the result to return when no task finds a result
/** * Returns the value indicating the computation completed with no task * finding a short-circuitable result. For example, for a "find" operation, * this might be null or an empty {@code Optional}. * * @return the result to return when no task finds a result */
protected abstract R getEmptyResult();
Overrides AbstractTask version to include checks for early exits while splitting or computing.
/** * Overrides AbstractTask version to include checks for early * exits while splitting or computing. */
@Override public void compute() { Spliterator<P_IN> rs = spliterator, ls; long sizeEstimate = rs.estimateSize(); long sizeThreshold = getTargetSize(sizeEstimate); boolean forkRight = false; @SuppressWarnings("unchecked") K task = (K) this; AtomicReference<R> sr = sharedResult; R result; while ((result = sr.get()) == null) { if (task.taskCanceled()) { result = task.getEmptyResult(); break; } if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) { result = task.doLeaf(); break; } K leftChild, rightChild, taskToFork; task.leftChild = leftChild = task.makeChild(ls); task.rightChild = rightChild = task.makeChild(rs); task.setPendingCount(1); if (forkRight) { forkRight = false; rs = ls; task = leftChild; taskToFork = rightChild; } else { forkRight = true; task = rightChild; taskToFork = leftChild; } taskToFork.fork(); sizeEstimate = rs.estimateSize(); } task.setLocalResult(result); task.tryComplete(); }
Declares that a globally valid result has been found. If another task has not already found the answer, the result is installed in sharedResult. The compute() method will check sharedResult before proceeding with computation, so this causes the computation to terminate early.
Params:
  • result – the result found
/** * Declares that a globally valid result has been found. If another task has * not already found the answer, the result is installed in * {@code sharedResult}. The {@code compute()} method will check * {@code sharedResult} before proceeding with computation, so this causes * the computation to terminate early. * * @param result the result found */
protected void shortCircuit(R result) { if (result != null) sharedResult.compareAndSet(null, result); }
Sets a local result for this task. If this task is the root, set the shared result instead (if not already set).
Params:
  • localResult – The result to set for this task
/** * Sets a local result for this task. If this task is the root, set the * shared result instead (if not already set). * * @param localResult The result to set for this task */
@Override protected void setLocalResult(R localResult) { if (isRoot()) { if (localResult != null) sharedResult.compareAndSet(null, localResult); } else super.setLocalResult(localResult); }
Retrieves the local result for this task
/** * Retrieves the local result for this task */
@Override public R getRawResult() { return getLocalResult(); }
Retrieves the local result for this task. If this task is the root, retrieves the shared result instead.
/** * Retrieves the local result for this task. If this task is the root, * retrieves the shared result instead. */
@Override public R getLocalResult() { if (isRoot()) { R answer = sharedResult.get(); return (answer == null) ? getEmptyResult() : answer; } else return super.getLocalResult(); }
Mark this task as canceled
/** * Mark this task as canceled */
protected void cancel() { canceled = true; }
Queries whether this task is canceled. A task is considered canceled if it or any of its parents have been canceled.
Returns:true if this task or any parent is canceled.
/** * Queries whether this task is canceled. A task is considered canceled if * it or any of its parents have been canceled. * * @return {@code true} if this task or any parent is canceled. */
protected boolean taskCanceled() { boolean cancel = canceled; if (!cancel) { for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent()) cancel = parent.canceled; } return cancel; }
Cancels all tasks which succeed this one in the encounter order. This includes canceling all the current task's right sibling, as well as the later right siblings of all its parents.
/** * Cancels all tasks which succeed this one in the encounter order. This * includes canceling all the current task's right sibling, as well as the * later right siblings of all its parents. */
protected void cancelLaterNodes() { // Go up the tree, cancel right siblings of this node and all parents for (@SuppressWarnings("unchecked") K parent = getParent(), node = (K) this; parent != null; node = parent, parent = parent.getParent()) { // If node is a left child of parent, then has a right sibling if (parent.leftChild == node) { K rightSibling = parent.rightChild; if (!rightSibling.canceled) rightSibling.cancel(); } } } }