/*
 * Copyright (C) 2018 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.annotations.Beta;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

Serializes execution of a set of operations. This class guarantees that a submitted callable will not be called before previously submitted callables (and any Futures returned from them) have completed.

This class implements a superset of the behavior of MoreExecutors.newSequentialExecutor. If your tasks all run on the same underlying executor and don't need to wait for Futures returned from AsyncCallables, use it instead.

Since:26.0
/** * Serializes execution of a set of operations. This class guarantees that a submitted callable will * not be called before previously submitted callables (and any {@code Future}s returned from them) * have completed. * * <p>This class implements a superset of the behavior of {@link * MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and * don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead. * * @since 26.0 */
@Beta public final class ExecutionSequencer { private ExecutionSequencer() {}
Creates a new instance.
/** Creates a new instance. */
public static ExecutionSequencer create() { return new ExecutionSequencer(); } enum RunningState { NOT_RUN, CANCELLED, STARTED, }
This reference acts as a pointer tracking the head of a linked list of ListenableFutures.
/** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
private final AtomicReference<ListenableFuture<Object>> ref = new AtomicReference<>(immediateFuture(null));
Enqueues a task to run when the previous task (if any) completes.

Cancellation does not propagate from the output future to a callable that has begun to execute, but if the output future is cancelled before Callable.call() is invoked, Callable.call() will not be invoked.

/** * Enqueues a task to run when the previous task (if any) completes. * * <p>Cancellation does not propagate from the output future to a callable that has begun to * execute, but if the output future is cancelled before {@link Callable#call()} is invoked, * {@link Callable#call()} will not be invoked. */
public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) { checkNotNull(callable); return submitAsync( new AsyncCallable<T>() { @Override public ListenableFuture<T> call() throws Exception { return immediateFuture(callable.call()); } @Override public String toString() { return callable.toString(); } }, executor); }
Enqueues a task to run when the previous task (if any) completes.

Cancellation does not propagate from the output future to the future returned from callable or a callable that has begun to execute, but if the output future is cancelled before AsyncCallable.call() is invoked, AsyncCallable.call() will not be invoked.

/** * Enqueues a task to run when the previous task (if any) completes. * * <p>Cancellation does not propagate from the output future to the future returned from {@code * callable} or a callable that has begun to execute, but if the output future is cancelled before * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked. */
public <T> ListenableFuture<T> submitAsync( final AsyncCallable<T> callable, final Executor executor) { checkNotNull(callable); final AtomicReference<RunningState> runningState = new AtomicReference<>(NOT_RUN); final AsyncCallable<T> task = new AsyncCallable<T>() { @Override public ListenableFuture<T> call() throws Exception { if (!runningState.compareAndSet(NOT_RUN, STARTED)) { return immediateCancelledFuture(); } return callable.call(); } @Override public String toString() { return callable.toString(); } }; /* * Four futures are at play here: * taskFuture is the future tracking the result of the callable. * newFuture is a future that completes after this and all prior tasks are done. * oldFuture is the previous task's newFuture. * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture. * * newFuture is guaranteed to only complete once all tasks previously submitted to this instance * have completed - namely after oldFuture is done, and taskFuture has either completed or been * cancelled before the callable started execution. */ final SettableFuture<Object> newFuture = SettableFuture.create(); final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture); // Invoke our task once the previous future completes. final ListenableFuture<T> taskFuture = Futures.submitAsync( task, new Executor() { @Override public void execute(Runnable runnable) { oldFuture.addListener(runnable, executor); } }); final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture); // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that // if the future we return is cancelled, we don't begin execution of the next task until after // oldFuture completes. Runnable listener = new Runnable() { @Override public void run() { if (taskFuture.isDone() // If this CAS succeeds, we know that the provided callable will never be invoked, // so when oldFuture completes it is safe to allow the next submitted task to // proceed. || (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) { // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of // a future that eventually came from immediateFuture(null), this doesn't leak // throwables or completion values. newFuture.setFuture(oldFuture); } } }; // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture // propagates cancellation if the callable has not yet been invoked. outputFuture.addListener(listener, directExecutor()); taskFuture.addListener(listener, directExecutor()); return outputFuture; } }