/*
 * Copyright (C) 2006 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.annotations.GwtCompatible;
import com.google.common.collect.ImmutableCollection;
import com.google.errorprone.annotations.ForOverride;
import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;

A future made up of a collection of sub-futures.
Type parameters:
  • <InputT> – the type of the individual inputs
  • <OutputT> – the type of the output (i.e. this) future
/** * A future made up of a collection of sub-futures. * * @param <InputT> the type of the individual inputs * @param <OutputT> the type of the output (i.e. this) future */
@GwtCompatible abstract class AggregateFuture<InputT, OutputT> extends AbstractFuture.TrustedFuture<OutputT> { private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName()); /* * In certain circumstances, this field might theoretically not be visible to an afterDone() call * triggered by cancel(). For details, see the comments on the fields of TimeoutFuture. */ private @Nullable RunningState runningState; @Override protected final void afterDone() { super.afterDone(); RunningState localRunningState = runningState; if (localRunningState != null) { // Let go of the memory held by the running state this.runningState = null; ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures = localRunningState.futures; boolean wasInterrupted = wasInterrupted(); if (wasInterrupted) { localRunningState.interruptTask(); } if (isCancelled() & futures != null) { for (ListenableFuture<?> future : futures) { future.cancel(wasInterrupted); } } } } @Override protected String pendingToString() { RunningState localRunningState = runningState; if (localRunningState == null) { return null; } ImmutableCollection<? extends ListenableFuture<? extends InputT>> localFutures = localRunningState.futures; if (localFutures != null) { return "futures=[" + localFutures + "]"; } return null; }
Must be called at the end of each sub-class's constructor.
/** Must be called at the end of each sub-class's constructor. */
final void init(RunningState runningState) { this.runningState = runningState; runningState.init(); } abstract class RunningState extends AggregateFutureState implements Runnable { private ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures; private final boolean allMustSucceed; private final boolean collectsValues; RunningState( ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures, boolean allMustSucceed, boolean collectsValues) { super(futures.size()); this.futures = checkNotNull(futures); this.allMustSucceed = allMustSucceed; this.collectsValues = collectsValues; } /* Used in the !allMustSucceed case so we don't have to instantiate a listener. */ @Override public final void run() { decrementCountAndMaybeComplete(); }
The "real" initialization; we can't put this in the constructor because, in the case where futures are already complete, we would not initialize the subclass before calling handleOneInputDone. As this is called after the subclass is constructed, we're guaranteed to have properly initialized the subclass.
/** * The "real" initialization; we can't put this in the constructor because, in the case where * futures are already complete, we would not initialize the subclass before calling {@link * #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed * to have properly initialized the subclass. */
private void init() { // Corner case: List is empty. if (futures.isEmpty()) { handleAllCompleted(); return; } // NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll // need to handle RejectedExecutionException if (allMustSucceed) { // We need fail fast, so we have to keep track of which future failed so we can propagate // the exception immediately // Register a listener on each Future in the list to update the state of this future. // Note that if all the futures on the list are done prior to completing this loop, the last // call to addListener() will callback to setOneValue(), transitively call our cleanup // listener, and set this.futures to null. // This is not actually a problem, since the foreach only needs this.futures to be non-null // at the beginning of the loop. int i = 0; for (final ListenableFuture<? extends InputT> listenable : futures) { final int index = i++; listenable.addListener( new Runnable() { @Override public void run() { try { handleOneInputDone(index, listenable); } finally { decrementCountAndMaybeComplete(); } } }, directExecutor()); } } else { // We'll only call the callback when all futures complete, regardless of whether some failed // Hold off on calling setOneValue until all complete, so we can share the same listener for (ListenableFuture<? extends InputT> listenable : futures) { listenable.addListener(this, directExecutor()); } } }
Fails this future with the given Throwable if RunningState.allMustSucceed is true. Also, logs the throwable if it is an Error or if RunningState.allMustSucceed is true, the throwable did not cause this future to fail, and it is the first time we've seen that particular Throwable.
/** * Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the * throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the * throwable did not cause this future to fail, and it is the first time we've seen that * particular Throwable. */
private void handleException(Throwable throwable) { checkNotNull(throwable); boolean completedWithFailure = false; boolean firstTimeSeeingThisException = true; if (allMustSucceed) { // As soon as the first one fails, throw the exception up. // The result of all other inputs is then ignored. completedWithFailure = setException(throwable); if (completedWithFailure) { releaseResourcesAfterFailure(); } else { // Go up the causal chain to see if we've already seen this cause; if we have, even if // it's wrapped by a different exception, don't log it. firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable); } } // | and & used because it's faster than the branch required for || and && if (throwable instanceof Error | (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) { String message = (throwable instanceof Error) ? "Input Future failed with Error" : "Got more than one input Future failure. Logging failures after the first"; logger.log(Level.SEVERE, message, throwable); } } @Override final void addInitialException(Set<Throwable> seen) { if (!isCancelled()) { // TODO(cpovirk): Think about whether we could/should use Verify to check this. boolean unused = addCausalChain(seen, trustedGetException()); } }
Handles the input at the given index completing.
/** Handles the input at the given index completing. */
private void handleOneInputDone(int index, Future<? extends InputT> future) { // The only cases in which this Future should already be done are (a) if it was cancelled or // (b) if an input failed and we propagated that immediately because of allMustSucceed. checkState( allMustSucceed || !isDone() || isCancelled(), "Future was done before all dependencies completed"); try { checkState(future.isDone(), "Tried to set value from future which is not done"); if (allMustSucceed) { if (future.isCancelled()) { // clear running state prior to cancelling children, this sets our own state but lets // the input futures keep running as some of them may be used elsewhere. runningState = null; cancel(false); } else { // We always get the result so that we can have fail-fast, even if we don't collect InputT result = getDone(future); if (collectsValues) { collectOneValue(allMustSucceed, index, result); } } } else if (collectsValues && !future.isCancelled()) { collectOneValue(allMustSucceed, index, getDone(future)); } } catch (ExecutionException e) { handleException(e.getCause()); } catch (Throwable t) { handleException(t); } } private void decrementCountAndMaybeComplete() { int newRemaining = decrementRemainingAndGet(); checkState(newRemaining >= 0, "Less than 0 remaining futures"); if (newRemaining == 0) { processCompleted(); } } private void processCompleted() { // Collect the values if (a) our output requires collecting them and (b) we haven't been // collecting them as we go. (We've collected them as we go only if we needed to fail fast) if (collectsValues & !allMustSucceed) { int i = 0; for (ListenableFuture<? extends InputT> listenable : futures) { handleOneInputDone(i++, listenable); } } handleAllCompleted(); }
Listeners implicitly keep a reference to RunningState as they're inner classes, so we free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we immediately release resources we no longer need); additionally, the future will release its reference to RunningState, which should free all associated memory when all the futures complete and the listeners are released.

TODO(user): Write tests for memory retention

/** * Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we * free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we * immediately release resources we no longer need); additionally, the future will release its * reference to {@link RunningState}, which should free all associated memory when all the * futures complete and the listeners are released. * * <p>TODO(user): Write tests for memory retention */
@ForOverride @OverridingMethodsMustInvokeSuper void releaseResourcesAfterFailure() { this.futures = null; }
Called only if collectsValues is true.

If allMustSucceed is true, called as each future completes; otherwise, called for each future when all futures complete.

/** * Called only if {@code collectsValues} is true. * * <p>If {@code allMustSucceed} is true, called as each future completes; otherwise, called for * each future when all futures complete. */
abstract void collectOneValue(boolean allMustSucceed, int index, @Nullable InputT returnValue); abstract void handleAllCompleted(); void interruptTask() {} }
Adds the chain to the seen set, and returns whether all the chain was new to us.
/** Adds the chain to the seen set, and returns whether all the chain was new to us. */
private static boolean addCausalChain(Set<Throwable> seen, Throwable t) { for (; t != null; t = t.getCause()) { boolean firstTimeSeen = seen.add(t); if (!firstTimeSeen) { /* * We've seen this, so we've seen its causes, too. No need to re-add them. (There's one case * where this isn't true, but we ignore it: If we record an exception, then someone calls * initCause() on it, and then we examine it again, we'll conclude that we've seen the whole * chain before when it fact we haven't. But this should be rare.) */ return false; } } return true; } }