package com.google.common.util.concurrent;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.AggregateFuture.ReleaseResourcesReason.OUTPUT_FUTURE_DONE;
import com.google.common.annotations.GwtCompatible;
import com.google.common.collect.ImmutableCollection;
import com.google.j2objc.annotations.WeakOuter;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import org.checkerframework.checker.nullness.qual.Nullable;
@GwtCompatible
final class CombinedFuture<V> extends AggregateFuture<Object, V> {
private CombinedFutureInterruptibleTask<?> task;
CombinedFuture(
ImmutableCollection<? extends ListenableFuture<?>> futures,
boolean allMustSucceed,
Executor listenerExecutor,
AsyncCallable<V> callable) {
super(futures, allMustSucceed, false);
this.task = new AsyncCallableInterruptibleTask(callable, listenerExecutor);
init();
}
CombinedFuture(
ImmutableCollection<? extends ListenableFuture<?>> futures,
boolean allMustSucceed,
Executor listenerExecutor,
Callable<V> callable) {
super(futures, allMustSucceed, false);
this.task = new CallableInterruptibleTask(callable, listenerExecutor);
init();
}
@Override
void collectOneValue(int index, @Nullable Object returnValue) {}
@Override
void handleAllCompleted() {
CombinedFutureInterruptibleTask<?> localTask = task;
if (localTask != null) {
localTask.execute();
}
}
@Override
void releaseResources(ReleaseResourcesReason reason) {
super.releaseResources(reason);
if (reason == OUTPUT_FUTURE_DONE) {
this.task = null;
}
}
@Override
protected void interruptTask() {
CombinedFutureInterruptibleTask<?> localTask = task;
if (localTask != null) {
localTask.interruptTask();
}
}
@WeakOuter
private abstract class CombinedFutureInterruptibleTask<T> extends InterruptibleTask<T> {
private final Executor listenerExecutor;
boolean thrownByExecute = true;
CombinedFutureInterruptibleTask(Executor listenerExecutor) {
this.listenerExecutor = checkNotNull(listenerExecutor);
}
@Override
final boolean isDone() {
return CombinedFuture.this.isDone();
}
final void execute() {
try {
listenerExecutor.execute(this);
} catch (RejectedExecutionException e) {
if (thrownByExecute) {
CombinedFuture.this.setException(e);
}
}
}
@Override
final void afterRanInterruptibly(T result, Throwable error) {
CombinedFuture.this.task = null;
if (error != null) {
if (error instanceof ExecutionException) {
CombinedFuture.this.setException(error.getCause());
} else if (error instanceof CancellationException) {
cancel(false);
} else {
CombinedFuture.this.setException(error);
}
} else {
setValue(result);
}
}
abstract void setValue(T value);
}
@WeakOuter
private final class AsyncCallableInterruptibleTask
extends CombinedFutureInterruptibleTask<ListenableFuture<V>> {
private final AsyncCallable<V> callable;
AsyncCallableInterruptibleTask(AsyncCallable<V> callable, Executor listenerExecutor) {
super(listenerExecutor);
this.callable = checkNotNull(callable);
}
@Override
ListenableFuture<V> runInterruptibly() throws Exception {
thrownByExecute = false;
ListenableFuture<V> result = callable.call();
return checkNotNull(
result,
"AsyncCallable.call returned null instead of a Future. "
+ "Did you mean to return immediateFuture(null)? %s",
callable);
}
@Override
void setValue(ListenableFuture<V> value) {
CombinedFuture.this.setFuture(value);
}
@Override
String toPendingString() {
return callable.toString();
}
}
@WeakOuter
private final class CallableInterruptibleTask extends CombinedFutureInterruptibleTask<V> {
private final Callable<V> callable;
CallableInterruptibleTask(Callable<V> callable, Executor listenerExecutor) {
super(listenerExecutor);
this.callable = checkNotNull(callable);
}
@Override
V runInterruptibly() throws Exception {
thrownByExecute = false;
return callable.call();
}
@Override
void setValue(V value) {
CombinedFuture.this.set(value);
}
@Override
String toPendingString() {
return callable.toString();
}
}
}