/*
* Copyright (C) 2006 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.glassfish.jersey.internal.guava;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.glassfish.jersey.internal.guava.Preconditions.checkNotNull;
Static utility methods pertaining to the Future
interface.
Many of these methods use the ListenableFuture
API; consult the Guava User Guide article on ListenableFuture
.
Author: Kevin Bourrillion, Nishant Thakkar, Sven Mawson Since: 1.0
/**
* Static utility methods pertaining to the {@link Future} interface.
* <p>
* <p>Many of these methods use the {@link ListenableFuture} API; consult the
* Guava User Guide article on <a href=
* "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
* {@code ListenableFuture}</a>.
*
* @author Kevin Bourrillion
* @author Nishant Thakkar
* @author Sven Mawson
* @since 1.0
*/
public final class Futures {
private Futures() {
}
Creates a ListenableFuture
which has its value set immediately upon construction. The getters just return the value. This Future
can't be canceled or timed out and its isDone()
method always returns true
. /**
* Creates a {@code ListenableFuture} which has its value set immediately upon
* construction. The getters just return the value. This {@code Future} can't
* be canceled or timed out and its {@code isDone()} method always returns
* {@code true}.
*/
public static <V> ListenableFuture<V> immediateFuture(V value) {
return new ImmediateSuccessfulFuture<V>(value);
}
Returns a ListenableFuture
which has an exception set immediately upon construction.
The returned Future
can't be cancelled, and its isDone()
method always returns true
. Calling get()
will immediately throw the provided Throwable
wrapped in an
ExecutionException
.
/**
* Returns a {@code ListenableFuture} which has an exception set immediately
* upon construction.
* <p>
* <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
* method always returns {@code true}. Calling {@code get()} will immediately
* throw the provided {@code Throwable} wrapped in an {@code
* ExecutionException}.
*/
public static <V> ListenableFuture<V> immediateFailedFuture(
Throwable throwable) {
checkNotNull(throwable);
return new ImmediateFailedFuture<V>(throwable);
}
Returns a new ListenableFuture
whose result is the product of applying the given Function
to the result of the given
Future
. Example:
ListenableFuture<QueryResult> queryFuture = ...;
Function<QueryResult, List<Row>> rowsFunction =
new Function<QueryResult, List<Row>>() {
public List<Row> apply(QueryResult queryResult) {
return queryResult.getRows();
}
};
ListenableFuture<List<Row>> rowsFuture =
transform(queryFuture, rowsFunction);
Note: If the transformation is slow or heavyweight, consider supplying an executor. If you do not supply an executor, transform
will use an inline executor, which carries some caveats for heavier operations. For example, the call to function.apply
may run on an unpredictable or undesirable thread:
- If the input
Future
is done at the time transform
is called, transform
will call function.apply
inline. - If the input
Future
is not yet done, transform
will schedule function.apply
to be run by the thread that completes the input Future
, which may be an internal system thread such as an RPC network thread.
Also note that, regardless of which thread executes the
function.apply
, all other registered but unexecuted listeners are prevented from running during its execution, even if those listeners are to run in other executors.
The returned Future
attempts to keep its cancellation state in sync with that of the input future. That is, if the returned Future
is cancelled, it will attempt to cancel the input, and if the input is cancelled, the returned Future
will receive a callback in which it will attempt to cancel itself.
An example use of this method is to convert a serializable object
returned from an RPC into a POJO.
Params: - input – The future to transform
- function – A Function to transform the results of the provided future
to the results of the returned future. This will be run in the thread
that notifies input it is complete.
Returns: A future that holds result of the transformation. Since: 9.0 (in 1.0 as compose
)
/**
* Returns a new {@code ListenableFuture} whose result is the product of
* applying the given {@code Function} to the result of the given {@code
* Future}. Example:
* <p>
* <pre> {@code
* ListenableFuture<QueryResult> queryFuture = ...;
* Function<QueryResult, List<Row>> rowsFunction =
* new Function<QueryResult, List<Row>>() {
* public List<Row> apply(QueryResult queryResult) {
* return queryResult.getRows();
* }
* };
* ListenableFuture<List<Row>> rowsFuture =
* transform(queryFuture, rowsFunction);}</pre>
* <p>
* <p>Note: If the transformation is slow or heavyweight, consider {@linkplain
* #transform(ListenableFuture, Function, Executor) supplying an executor}.
* If you do not supply an executor, {@code transform} will use an inline
* executor, which carries some caveats for heavier operations. For example,
* the call to {@code function.apply} may run on an unpredictable or
* undesirable thread:
* <p>
* <ul>
* <li>If the input {@code Future} is done at the time {@code transform} is
* called, {@code transform} will call {@code function.apply} inline.
* <li>If the input {@code Future} is not yet done, {@code transform} will
* schedule {@code function.apply} to be run by the thread that completes the
* input {@code Future}, which may be an internal system thread such as an
* RPC network thread.
* </ul>
* <p>
* <p>Also note that, regardless of which thread executes the {@code
* function.apply}, all other registered but unexecuted listeners are
* prevented from running during its execution, even if those listeners are
* to run in other executors.
* <p>
* <p>The returned {@code Future} attempts to keep its cancellation state in
* sync with that of the input future. That is, if the returned {@code Future}
* is cancelled, it will attempt to cancel the input, and if the input is
* cancelled, the returned {@code Future} will receive a callback in which it
* will attempt to cancel itself.
* <p>
* <p>An example use of this method is to convert a serializable object
* returned from an RPC into a POJO.
*
* @param input The future to transform
* @param function A Function to transform the results of the provided future
* to the results of the returned future. This will be run in the thread
* that notifies input it is complete.
* @return A future that holds result of the transformation.
* @since 9.0 (in 1.0 as {@code compose})
*/
public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
final Function<? super I, ? extends O> function) {
checkNotNull(function);
ChainingListenableFuture<I, O> output =
new ChainingListenableFuture<I, O>(asAsyncFunction(function), input);
input.addListener(output, MoreExecutors.directExecutor());
return output;
}
Wraps the given function as an AsyncFunction.
/**
* Wraps the given function as an AsyncFunction.
*/
private static <I, O> AsyncFunction<I, O> asAsyncFunction(
final Function<? super I, ? extends O> function) {
return new AsyncFunction<I, O>() {
@Override
public ListenableFuture<O> apply(I input) {
O output = function.apply(input);
return immediateFuture(output);
}
};
}
private abstract static class ImmediateFuture<V>
implements ListenableFuture<V> {
private static final Logger log =
Logger.getLogger(ImmediateFuture.class.getName());
@Override
public void addListener(Runnable listener, Executor executor) {
checkNotNull(listener, "Runnable was null.");
checkNotNull(executor, "Executor was null.");
try {
executor.execute(listener);
} catch (RuntimeException e) {
// ListenableFuture's contract is that it will not throw unchecked
// exceptions, so log the bad runnable and/or executor and swallow it.
log.log(Level.SEVERE, "RuntimeException while executing runnable "
+ listener + " with executor " + executor, e);
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public abstract V get() throws ExecutionException;
@Override
public V get(long timeout, TimeUnit unit) throws ExecutionException {
checkNotNull(unit);
return get();
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return true;
}
}
private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
private final V value;
ImmediateSuccessfulFuture(V value) {
this.value = value;
}
@Override
public V get() {
return value;
}
}
private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
private final Throwable thrown;
ImmediateFailedFuture(Throwable thrown) {
this.thrown = thrown;
}
@Override
public V get() throws ExecutionException {
throw new ExecutionException(thrown);
}
}
An implementation of ListenableFuture
that also implements Runnable
so that it can be used to nest ListenableFutures. Once the passed-in ListenableFuture
is complete, it calls the passed-in Function
to generate the result.
For historical reasons, this class has a special case in its exception handling: If the given AsyncFunction
throws an
UndeclaredThrowableException
, ChainingListenableFuture
unwraps it and uses its cause as the output future's exception, rather than using the UndeclaredThrowableException
itself as it would for other exception types. The reason for this is that Futures.transform
used to require a Function
, whose apply
method is not allowed to throw checked exceptions. Nowadays, Futures.transform
has an overload that accepts an AsyncFunction
, whose apply
method is allowed to throw checked exception. Users who wish to throw
checked exceptions should use that overload instead, and we should remove the UndeclaredThrowableException
special case.
/**
* An implementation of {@code ListenableFuture} that also implements
* {@code Runnable} so that it can be used to nest ListenableFutures.
* Once the passed-in {@code ListenableFuture} is complete, it calls the
* passed-in {@code Function} to generate the result.
* <p>
* <p>For historical reasons, this class has a special case in its exception
* handling: If the given {@code AsyncFunction} throws an {@code
* UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
* and uses its <i>cause</i> as the output future's exception, rather than
* using the {@code UndeclaredThrowableException} itself as it would for other
* exception types. The reason for this is that {@code Futures.transform} used
* to require a {@code Function}, whose {@code apply} method is not allowed to
* throw checked exceptions. Nowadays, {@code Futures.transform} has an
* overload that accepts an {@code AsyncFunction}, whose {@code apply} method
* <i>is</i> allowed to throw checked exception. Users who wish to throw
* checked exceptions should use that overload instead, and <a
* href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
* should remove the {@code UndeclaredThrowableException} special case</a>.
*/
private static class ChainingListenableFuture<I, O>
extends AbstractFuture<O> implements Runnable {
private AsyncFunction<? super I, ? extends O> function;
private ListenableFuture<? extends I> inputFuture;
private volatile ListenableFuture<? extends O> outputFuture;
private ChainingListenableFuture(
AsyncFunction<? super I, ? extends O> function,
ListenableFuture<? extends I> inputFuture) {
this.function = checkNotNull(function);
this.inputFuture = checkNotNull(inputFuture);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
/*
* Our additional cancellation work needs to occur even if
* !mayInterruptIfRunning, so we can't move it into interruptTask().
*/
if (super.cancel(mayInterruptIfRunning)) {
// This should never block since only one thread is allowed to cancel
// this Future.
cancel(inputFuture, mayInterruptIfRunning);
cancel(outputFuture, mayInterruptIfRunning);
return true;
}
return false;
}
private void cancel(Future<?> future,
boolean mayInterruptIfRunning) {
if (future != null) {
future.cancel(mayInterruptIfRunning);
}
}
@Override
public void run() {
try {
I sourceResult;
try {
sourceResult = Uninterruptibles.getUninterruptibly(inputFuture);
} catch (CancellationException e) {
// Cancel this future and return.
// At this point, inputFuture is cancelled and outputFuture doesn't
// exist, so the value of mayInterruptIfRunning is irrelevant.
cancel(false);
return;
} catch (ExecutionException e) {
// Set the cause of the exception as this future's exception
setException(e.getCause());
return;
}
final ListenableFuture<? extends O> outputFuture = this.outputFuture =
Preconditions.checkNotNull(
function.apply(sourceResult),
"AsyncFunction may not return null.");
if (isCancelled()) {
outputFuture.cancel(wasInterrupted());
this.outputFuture = null;
return;
}
outputFuture.addListener(new Runnable() {
@Override
public void run() {
try {
set(Uninterruptibles.getUninterruptibly(outputFuture));
} catch (CancellationException e) {
// Cancel this future and return.
// At this point, inputFuture and outputFuture are done, so the
// value of mayInterruptIfRunning is irrelevant.
cancel(false);
} catch (ExecutionException e) {
// Set the cause of the exception as this future's exception
setException(e.getCause());
} finally {
// Don't pin inputs beyond completion
Futures.ChainingListenableFuture.this.outputFuture = null;
}
}
}, MoreExecutors.directExecutor());
} catch (UndeclaredThrowableException e) {
// Set the cause of the exception as this future's exception
setException(e.getCause());
} catch (Throwable t) {
// This exception is irrelevant in this thread, but useful for the
// client
setException(t);
} finally {
// Don't pin inputs beyond completion
function = null;
inputFuture = null;
}
}
}
}