/*
 * Copyright (C) 2006 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.glassfish.jersey.internal.guava;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.glassfish.jersey.internal.guava.Preconditions.checkNotNull;

Static utility methods pertaining to the Future interface.

Many of these methods use the ListenableFuture API; consult the Guava User Guide article on ListenableFuture.

Author:Kevin Bourrillion, Nishant Thakkar, Sven Mawson
Since:1.0
/** * Static utility methods pertaining to the {@link Future} interface. * <p> * <p>Many of these methods use the {@link ListenableFuture} API; consult the * Guava User Guide article on <a href= * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> * {@code ListenableFuture}</a>. * * @author Kevin Bourrillion * @author Nishant Thakkar * @author Sven Mawson * @since 1.0 */
public final class Futures { private Futures() { }
Creates a ListenableFuture which has its value set immediately upon construction. The getters just return the value. This Future can't be canceled or timed out and its isDone() method always returns true.
/** * Creates a {@code ListenableFuture} which has its value set immediately upon * construction. The getters just return the value. This {@code Future} can't * be canceled or timed out and its {@code isDone()} method always returns * {@code true}. */
public static <V> ListenableFuture<V> immediateFuture(V value) { return new ImmediateSuccessfulFuture<V>(value); }
Returns a ListenableFuture which has an exception set immediately upon construction.

The returned Future can't be cancelled, and its isDone() method always returns true. Calling get() will immediately throw the provided Throwable wrapped in an ExecutionException.

/** * Returns a {@code ListenableFuture} which has an exception set immediately * upon construction. * <p> * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} * method always returns {@code true}. Calling {@code get()} will immediately * throw the provided {@code Throwable} wrapped in an {@code * ExecutionException}. */
public static <V> ListenableFuture<V> immediateFailedFuture( Throwable throwable) { checkNotNull(throwable); return new ImmediateFailedFuture<V>(throwable); }
Returns a new ListenableFuture whose result is the product of applying the given Function to the result of the given Future. Example:

 
  ListenableFuture<QueryResult> queryFuture = ...;
  Function<QueryResult, List<Row>> rowsFunction =
      new Function<QueryResult, List<Row>>() {
        public List<Row> apply(QueryResult queryResult) {
          return queryResult.getRows();
        }
      };
  ListenableFuture<List<Row>> rowsFuture =
      transform(queryFuture, rowsFunction);

Note: If the transformation is slow or heavyweight, consider supplying an executor. If you do not supply an executor, transform will use an inline executor, which carries some caveats for heavier operations. For example, the call to function.apply may run on an unpredictable or undesirable thread:

  • If the input Future is done at the time transform is called, transform will call function.apply inline.
  • If the input Future is not yet done, transform will schedule function.apply to be run by the thread that completes the input Future, which may be an internal system thread such as an RPC network thread.

Also note that, regardless of which thread executes the function.apply, all other registered but unexecuted listeners are prevented from running during its execution, even if those listeners are to run in other executors.

The returned Future attempts to keep its cancellation state in sync with that of the input future. That is, if the returned Future is cancelled, it will attempt to cancel the input, and if the input is cancelled, the returned Future will receive a callback in which it will attempt to cancel itself.

An example use of this method is to convert a serializable object returned from an RPC into a POJO.

Params:
  • input – The future to transform
  • function – A Function to transform the results of the provided future to the results of the returned future. This will be run in the thread that notifies input it is complete.
Returns:A future that holds result of the transformation.
Since:9.0 (in 1.0 as compose)
/** * Returns a new {@code ListenableFuture} whose result is the product of * applying the given {@code Function} to the result of the given {@code * Future}. Example: * <p> * <pre> {@code * ListenableFuture<QueryResult> queryFuture = ...; * Function<QueryResult, List<Row>> rowsFunction = * new Function<QueryResult, List<Row>>() { * public List<Row> apply(QueryResult queryResult) { * return queryResult.getRows(); * } * }; * ListenableFuture<List<Row>> rowsFuture = * transform(queryFuture, rowsFunction);}</pre> * <p> * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain * #transform(ListenableFuture, Function, Executor) supplying an executor}. * If you do not supply an executor, {@code transform} will use an inline * executor, which carries some caveats for heavier operations. For example, * the call to {@code function.apply} may run on an unpredictable or * undesirable thread: * <p> * <ul> * <li>If the input {@code Future} is done at the time {@code transform} is * called, {@code transform} will call {@code function.apply} inline. * <li>If the input {@code Future} is not yet done, {@code transform} will * schedule {@code function.apply} to be run by the thread that completes the * input {@code Future}, which may be an internal system thread such as an * RPC network thread. * </ul> * <p> * <p>Also note that, regardless of which thread executes the {@code * function.apply}, all other registered but unexecuted listeners are * prevented from running during its execution, even if those listeners are * to run in other executors. * <p> * <p>The returned {@code Future} attempts to keep its cancellation state in * sync with that of the input future. That is, if the returned {@code Future} * is cancelled, it will attempt to cancel the input, and if the input is * cancelled, the returned {@code Future} will receive a callback in which it * will attempt to cancel itself. * <p> * <p>An example use of this method is to convert a serializable object * returned from an RPC into a POJO. * * @param input The future to transform * @param function A Function to transform the results of the provided future * to the results of the returned future. This will be run in the thread * that notifies input it is complete. * @return A future that holds result of the transformation. * @since 9.0 (in 1.0 as {@code compose}) */
public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, final Function<? super I, ? extends O> function) { checkNotNull(function); ChainingListenableFuture<I, O> output = new ChainingListenableFuture<I, O>(asAsyncFunction(function), input); input.addListener(output, MoreExecutors.directExecutor()); return output; }
Wraps the given function as an AsyncFunction.
/** * Wraps the given function as an AsyncFunction. */
private static <I, O> AsyncFunction<I, O> asAsyncFunction( final Function<? super I, ? extends O> function) { return new AsyncFunction<I, O>() { @Override public ListenableFuture<O> apply(I input) { O output = function.apply(input); return immediateFuture(output); } }; } private abstract static class ImmediateFuture<V> implements ListenableFuture<V> { private static final Logger log = Logger.getLogger(ImmediateFuture.class.getName()); @Override public void addListener(Runnable listener, Executor executor) { checkNotNull(listener, "Runnable was null."); checkNotNull(executor, "Executor was null."); try { executor.execute(listener); } catch (RuntimeException e) { // ListenableFuture's contract is that it will not throw unchecked // exceptions, so log the bad runnable and/or executor and swallow it. log.log(Level.SEVERE, "RuntimeException while executing runnable " + listener + " with executor " + executor, e); } } @Override public boolean cancel(boolean mayInterruptIfRunning) { return false; } @Override public abstract V get() throws ExecutionException; @Override public V get(long timeout, TimeUnit unit) throws ExecutionException { checkNotNull(unit); return get(); } @Override public boolean isCancelled() { return false; } @Override public boolean isDone() { return true; } } private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { private final V value; ImmediateSuccessfulFuture(V value) { this.value = value; } @Override public V get() { return value; } } private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { private final Throwable thrown; ImmediateFailedFuture(Throwable thrown) { this.thrown = thrown; } @Override public V get() throws ExecutionException { throw new ExecutionException(thrown); } }
An implementation of ListenableFuture that also implements Runnable so that it can be used to nest ListenableFutures. Once the passed-in ListenableFuture is complete, it calls the passed-in Function to generate the result.

For historical reasons, this class has a special case in its exception handling: If the given AsyncFunction throws an UndeclaredThrowableException, ChainingListenableFuture unwraps it and uses its cause as the output future's exception, rather than using the UndeclaredThrowableException itself as it would for other exception types. The reason for this is that Futures.transform used to require a Function, whose apply method is not allowed to throw checked exceptions. Nowadays, Futures.transform has an overload that accepts an AsyncFunction, whose apply method is allowed to throw checked exception. Users who wish to throw checked exceptions should use that overload instead, and we should remove the UndeclaredThrowableException special case.

/** * An implementation of {@code ListenableFuture} that also implements * {@code Runnable} so that it can be used to nest ListenableFutures. * Once the passed-in {@code ListenableFuture} is complete, it calls the * passed-in {@code Function} to generate the result. * <p> * <p>For historical reasons, this class has a special case in its exception * handling: If the given {@code AsyncFunction} throws an {@code * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it * and uses its <i>cause</i> as the output future's exception, rather than * using the {@code UndeclaredThrowableException} itself as it would for other * exception types. The reason for this is that {@code Futures.transform} used * to require a {@code Function}, whose {@code apply} method is not allowed to * throw checked exceptions. Nowadays, {@code Futures.transform} has an * overload that accepts an {@code AsyncFunction}, whose {@code apply} method * <i>is</i> allowed to throw checked exception. Users who wish to throw * checked exceptions should use that overload instead, and <a * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we * should remove the {@code UndeclaredThrowableException} special case</a>. */
private static class ChainingListenableFuture<I, O> extends AbstractFuture<O> implements Runnable { private AsyncFunction<? super I, ? extends O> function; private ListenableFuture<? extends I> inputFuture; private volatile ListenableFuture<? extends O> outputFuture; private ChainingListenableFuture( AsyncFunction<? super I, ? extends O> function, ListenableFuture<? extends I> inputFuture) { this.function = checkNotNull(function); this.inputFuture = checkNotNull(inputFuture); } @Override public boolean cancel(boolean mayInterruptIfRunning) { /* * Our additional cancellation work needs to occur even if * !mayInterruptIfRunning, so we can't move it into interruptTask(). */ if (super.cancel(mayInterruptIfRunning)) { // This should never block since only one thread is allowed to cancel // this Future. cancel(inputFuture, mayInterruptIfRunning); cancel(outputFuture, mayInterruptIfRunning); return true; } return false; } private void cancel(Future<?> future, boolean mayInterruptIfRunning) { if (future != null) { future.cancel(mayInterruptIfRunning); } } @Override public void run() { try { I sourceResult; try { sourceResult = Uninterruptibles.getUninterruptibly(inputFuture); } catch (CancellationException e) { // Cancel this future and return. // At this point, inputFuture is cancelled and outputFuture doesn't // exist, so the value of mayInterruptIfRunning is irrelevant. cancel(false); return; } catch (ExecutionException e) { // Set the cause of the exception as this future's exception setException(e.getCause()); return; } final ListenableFuture<? extends O> outputFuture = this.outputFuture = Preconditions.checkNotNull( function.apply(sourceResult), "AsyncFunction may not return null."); if (isCancelled()) { outputFuture.cancel(wasInterrupted()); this.outputFuture = null; return; } outputFuture.addListener(new Runnable() { @Override public void run() { try { set(Uninterruptibles.getUninterruptibly(outputFuture)); } catch (CancellationException e) { // Cancel this future and return. // At this point, inputFuture and outputFuture are done, so the // value of mayInterruptIfRunning is irrelevant. cancel(false); } catch (ExecutionException e) { // Set the cause of the exception as this future's exception setException(e.getCause()); } finally { // Don't pin inputs beyond completion Futures.ChainingListenableFuture.this.outputFuture = null; } } }, MoreExecutors.directExecutor()); } catch (UndeclaredThrowableException e) { // Set the cause of the exception as this future's exception setException(e.getCause()); } catch (Throwable t) { // This exception is irrelevant in this thread, but useful for the // client setException(t); } finally { // Don't pin inputs beyond completion function = null; inputFuture = null; } } } }