/*
* Copyright (C) 2006 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.getDone;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import com.google.common.annotations.GwtCompatible;
import com.google.common.collect.ImmutableCollection;
import com.google.errorprone.annotations.ForOverride;
import com.google.errorprone.annotations.OverridingMethodsMustInvokeSuper;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.checkerframework.checker.nullness.qual.Nullable;
A future made up of a collection of sub-futures.
Type parameters: - <InputT> – the type of the individual inputs
- <OutputT> – the type of the output (i.e. this) future
/**
* A future made up of a collection of sub-futures.
*
* @param <InputT> the type of the individual inputs
* @param <OutputT> the type of the output (i.e. this) future
*/
@GwtCompatible
abstract class AggregateFuture<InputT, OutputT> extends AbstractFuture.TrustedFuture<OutputT> {
private static final Logger logger = Logger.getLogger(AggregateFuture.class.getName());
/*
* In certain circumstances, this field might theoretically not be visible to an afterDone() call
* triggered by cancel(). For details, see the comments on the fields of TimeoutFuture.
*/
private @Nullable RunningState runningState;
@Override
protected final void afterDone() {
super.afterDone();
RunningState localRunningState = runningState;
if (localRunningState != null) {
// Let go of the memory held by the running state
this.runningState = null;
ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures =
localRunningState.futures;
boolean wasInterrupted = wasInterrupted();
if (wasInterrupted) {
localRunningState.interruptTask();
}
if (isCancelled() & futures != null) {
for (ListenableFuture<?> future : futures) {
future.cancel(wasInterrupted);
}
}
}
}
@Override
protected String pendingToString() {
RunningState localRunningState = runningState;
if (localRunningState == null) {
return null;
}
ImmutableCollection<? extends ListenableFuture<? extends InputT>> localFutures =
localRunningState.futures;
if (localFutures != null) {
return "futures=[" + localFutures + "]";
}
return null;
}
Must be called at the end of each sub-class's constructor. /** Must be called at the end of each sub-class's constructor. */
final void init(RunningState runningState) {
this.runningState = runningState;
runningState.init();
}
abstract class RunningState extends AggregateFutureState implements Runnable {
private ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures;
private final boolean allMustSucceed;
private final boolean collectsValues;
RunningState(
ImmutableCollection<? extends ListenableFuture<? extends InputT>> futures,
boolean allMustSucceed,
boolean collectsValues) {
super(futures.size());
this.futures = checkNotNull(futures);
this.allMustSucceed = allMustSucceed;
this.collectsValues = collectsValues;
}
/* Used in the !allMustSucceed case so we don't have to instantiate a listener. */
@Override
public final void run() {
decrementCountAndMaybeComplete();
}
The "real" initialization; we can't put this in the constructor because, in the case where futures are already complete, we would not initialize the subclass before calling handleOneInputDone
. As this is called after the subclass is constructed, we're guaranteed to have properly initialized the subclass. /**
* The "real" initialization; we can't put this in the constructor because, in the case where
* futures are already complete, we would not initialize the subclass before calling {@link
* #handleOneInputDone}. As this is called after the subclass is constructed, we're guaranteed
* to have properly initialized the subclass.
*/
private void init() {
// Corner case: List is empty.
if (futures.isEmpty()) {
handleAllCompleted();
return;
}
// NOTE: If we ever want to use a custom executor here, have a look at CombinedFuture as we'll
// need to handle RejectedExecutionException
if (allMustSucceed) {
// We need fail fast, so we have to keep track of which future failed so we can propagate
// the exception immediately
// Register a listener on each Future in the list to update the state of this future.
// Note that if all the futures on the list are done prior to completing this loop, the last
// call to addListener() will callback to setOneValue(), transitively call our cleanup
// listener, and set this.futures to null.
// This is not actually a problem, since the foreach only needs this.futures to be non-null
// at the beginning of the loop.
int i = 0;
for (final ListenableFuture<? extends InputT> listenable : futures) {
final int index = i++;
listenable.addListener(
new Runnable() {
@Override
public void run() {
try {
handleOneInputDone(index, listenable);
} finally {
decrementCountAndMaybeComplete();
}
}
},
directExecutor());
}
} else {
// We'll only call the callback when all futures complete, regardless of whether some failed
// Hold off on calling setOneValue until all complete, so we can share the same listener
for (ListenableFuture<? extends InputT> listenable : futures) {
listenable.addListener(this, directExecutor());
}
}
}
Fails this future with the given Throwable if RunningState.allMustSucceed
is true. Also, logs the throwable if it is an Error
or if RunningState.allMustSucceed
is true
, the throwable did not cause this future to fail, and it is the first time we've seen that particular Throwable. /**
* Fails this future with the given Throwable if {@link #allMustSucceed} is true. Also, logs the
* throwable if it is an {@link Error} or if {@link #allMustSucceed} is {@code true}, the
* throwable did not cause this future to fail, and it is the first time we've seen that
* particular Throwable.
*/
private void handleException(Throwable throwable) {
checkNotNull(throwable);
boolean completedWithFailure = false;
boolean firstTimeSeeingThisException = true;
if (allMustSucceed) {
// As soon as the first one fails, throw the exception up.
// The result of all other inputs is then ignored.
completedWithFailure = setException(throwable);
if (completedWithFailure) {
releaseResourcesAfterFailure();
} else {
// Go up the causal chain to see if we've already seen this cause; if we have, even if
// it's wrapped by a different exception, don't log it.
firstTimeSeeingThisException = addCausalChain(getOrInitSeenExceptions(), throwable);
}
}
// | and & used because it's faster than the branch required for || and &&
if (throwable instanceof Error
| (allMustSucceed & !completedWithFailure & firstTimeSeeingThisException)) {
String message =
(throwable instanceof Error)
? "Input Future failed with Error"
: "Got more than one input Future failure. Logging failures after the first";
logger.log(Level.SEVERE, message, throwable);
}
}
@Override
final void addInitialException(Set<Throwable> seen) {
if (!isCancelled()) {
// TODO(cpovirk): Think about whether we could/should use Verify to check this.
boolean unused = addCausalChain(seen, trustedGetException());
}
}
Handles the input at the given index completing. /** Handles the input at the given index completing. */
private void handleOneInputDone(int index, Future<? extends InputT> future) {
// The only cases in which this Future should already be done are (a) if it was cancelled or
// (b) if an input failed and we propagated that immediately because of allMustSucceed.
checkState(
allMustSucceed || !isDone() || isCancelled(),
"Future was done before all dependencies completed");
try {
checkState(future.isDone(), "Tried to set value from future which is not done");
if (allMustSucceed) {
if (future.isCancelled()) {
// clear running state prior to cancelling children, this sets our own state but lets
// the input futures keep running as some of them may be used elsewhere.
runningState = null;
cancel(false);
} else {
// We always get the result so that we can have fail-fast, even if we don't collect
InputT result = getDone(future);
if (collectsValues) {
collectOneValue(allMustSucceed, index, result);
}
}
} else if (collectsValues && !future.isCancelled()) {
collectOneValue(allMustSucceed, index, getDone(future));
}
} catch (ExecutionException e) {
handleException(e.getCause());
} catch (Throwable t) {
handleException(t);
}
}
private void decrementCountAndMaybeComplete() {
int newRemaining = decrementRemainingAndGet();
checkState(newRemaining >= 0, "Less than 0 remaining futures");
if (newRemaining == 0) {
processCompleted();
}
}
private void processCompleted() {
// Collect the values if (a) our output requires collecting them and (b) we haven't been
// collecting them as we go. (We've collected them as we go only if we needed to fail fast)
if (collectsValues & !allMustSucceed) {
int i = 0;
for (ListenableFuture<? extends InputT> listenable : futures) {
handleOneInputDone(i++, listenable);
}
}
handleAllCompleted();
}
Listeners implicitly keep a reference to RunningState
as they're inner classes, so we free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we immediately release resources we no longer need); additionally, the future will release its reference to RunningState
, which should free all associated memory when all the futures complete and the listeners are released. TODO(user): Write tests for memory retention
/**
* Listeners implicitly keep a reference to {@link RunningState} as they're inner classes, so we
* free resources here as well for the allMustSucceed=true case (i.e. when a future fails, we
* immediately release resources we no longer need); additionally, the future will release its
* reference to {@link RunningState}, which should free all associated memory when all the
* futures complete and the listeners are released.
*
* <p>TODO(user): Write tests for memory retention
*/
@ForOverride
@OverridingMethodsMustInvokeSuper
void releaseResourcesAfterFailure() {
this.futures = null;
}
Called only if collectsValues
is true. If allMustSucceed
is true, called as each future completes; otherwise, called for each future when all futures complete.
/**
* Called only if {@code collectsValues} is true.
*
* <p>If {@code allMustSucceed} is true, called as each future completes; otherwise, called for
* each future when all futures complete.
*/
abstract void collectOneValue(boolean allMustSucceed, int index, @Nullable InputT returnValue);
abstract void handleAllCompleted();
void interruptTask() {}
}
Adds the chain to the seen set, and returns whether all the chain was new to us. /** Adds the chain to the seen set, and returns whether all the chain was new to us. */
private static boolean addCausalChain(Set<Throwable> seen, Throwable t) {
for (; t != null; t = t.getCause()) {
boolean firstTimeSeen = seen.add(t);
if (!firstTimeSeen) {
/*
* We've seen this, so we've seen its causes, too. No need to re-add them. (There's one case
* where this isn't true, but we ignore it: If we record an exception, then someone calls
* initCause() on it, and then we examine it again, we'll conclude that we've seen the whole
* chain before when it fact we haven't. But this should be rare.)
*/
return false;
}
}
return true;
}
}