/*
 * Copyright (c) 2011-Present VMware Inc. or its affiliates, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *        https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.util.retry;

import java.time.Duration;
import java.util.function.Function;

import org.reactivestreams.Publisher;

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import static reactor.util.retry.RetrySpec.*;

Base abstract class for a strategy to decide when to retry given a companion Flux of RetrySignal, for use with Flux.retryWhen(Retry) and Mono.retryWhen(Retry). Also provides access to configurable built-in strategies via static factory methods:

Users are encouraged to provide either concrete custom Retry strategies or builders that produce such concrete Retry. The RetrySpec returned by eg. max(long) is a good inspiration for a fluent approach that generates a Retry at each step and uses immutability/copy-on-write to enable sharing of intermediate steps (that can thus be considered templates).

Author:Simon Baslé
/** * Base abstract class for a strategy to decide when to retry given a companion {@link Flux} of {@link RetrySignal}, * for use with {@link Flux#retryWhen(Retry)} and {@link reactor.core.publisher.Mono#retryWhen(Retry)}. * Also provides access to configurable built-in strategies via static factory methods: * <ul> * <li>{@link #indefinitely()}</li> * <li>{@link #max(long)}</li> * <li>{@link #maxInARow(long)}</li> * <li>{@link #fixedDelay(long, Duration)}</li> * <li>{@link #backoff(long, Duration)}</li> * </ul> * <p> * Users are encouraged to provide either concrete custom {@link Retry} strategies or builders that produce * such concrete {@link Retry}. The {@link RetrySpec} returned by eg. {@link #max(long)} is a good inspiration * for a fluent approach that generates a {@link Retry} at each step and uses immutability/copy-on-write to enable * sharing of intermediate steps (that can thus be considered templates). * * @author Simon Baslé */
public abstract class Retry { public final ContextView retryContext; public Retry() { this(Context.empty()); } protected Retry(ContextView retryContext) { this.retryContext = retryContext; }
The intent of the functional Retry class is to let users configure how to react to RetrySignal by providing the operator with a companion publisher. Any onNext emitted by this publisher will trigger a retry, but if that emission is delayed compared to the original signal then the attempt is delayed as well. This method generates the companion, out of a Flux of RetrySignal, which itself can serve as the simplest form of retry companion (indefinitely and immediately retry on any error).
Params:
  • retrySignals – the original Flux of RetrySignal, notifying of each source error that might result in a retry attempt, with context around the error and current retry cycle.
Returns:the actual companion to use, which might delay or limit retry attempts
/** * The intent of the functional {@link Retry} class is to let users configure how to react to {@link RetrySignal} * by providing the operator with a companion publisher. Any {@link org.reactivestreams.Subscriber#onNext(Object) onNext} * emitted by this publisher will trigger a retry, but if that emission is delayed compared to the original signal then * the attempt is delayed as well. This method generates the companion, out of a {@link Flux} of {@link RetrySignal}, * which itself can serve as the simplest form of retry companion (indefinitely and immediately retry on any error). * * @param retrySignals the original {@link Flux} of {@link RetrySignal}, notifying of each source error that * <i>might</i> result in a retry attempt, with context around the error and current retry cycle. * @return the actual companion to use, which might delay or limit retry attempts */
public abstract Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals);
Return the user provided context that was set at construction time.
Returns:the user provided context that will be accessible via RetrySignal.retryContextView().
/** * Return the user provided context that was set at construction time. * * @return the user provided context that will be accessible via {@link RetrySignal#retryContextView()}. */
public ContextView retryContext() { return retryContext; }
State for a Flux.retryWhen(Retry) Flux retry} or Mono retry. A flux of states is passed to the user, which gives information about the failure() that potentially triggers a retry as well as two indexes: the number of errors that happened so far (and were retried) and the same number, but only taking into account subsequent errors (see totalRetriesInARow()).
/** * State for a {@link Flux#retryWhen(Retry)} Flux retry} or {@link reactor.core.publisher.Mono#retryWhen(Retry) Mono retry}. * A flux of states is passed to the user, which gives information about the {@link #failure()} that potentially triggers * a retry as well as two indexes: the number of errors that happened so far (and were retried) and the same number, * but only taking into account <strong>subsequent</strong> errors (see {@link #totalRetriesInARow()}). */
public interface RetrySignal {
The ZERO BASED index number of this error (can also be read as how many retries have occurred so far), since the source was first subscribed to.
Returns:a 0-index for the error, since original subscription
/** * The ZERO BASED index number of this error (can also be read as how many retries have occurred * so far), since the source was first subscribed to. * * @return a 0-index for the error, since original subscription */
long totalRetries();
The ZERO BASED index number of this error since the beginning of the current burst of errors. This is reset to zero whenever a retry is made that is followed by at least one onNext.
Returns:a 0-index for the error in the current burst of subsequent errors
/** * The ZERO BASED index number of this error since the beginning of the current burst of errors. * This is reset to zero whenever a retry is made that is followed by at least one * {@link org.reactivestreams.Subscriber#onNext(Object) onNext}. * * @return a 0-index for the error in the current burst of subsequent errors */
long totalRetriesInARow();
The current Throwable that needs to be evaluated for retry.
Returns:the current failure Throwable
/** * The current {@link Throwable} that needs to be evaluated for retry. * * @return the current failure {@link Throwable} */
Throwable failure();
Return a read-only view of the user provided context, which may be used to store objects to be reset/rollbacked or otherwise mutated before or after a retry.
Returns:a read-only view of a user provided context.
/** * Return a read-only view of the user provided context, which may be used to store * objects to be reset/rollbacked or otherwise mutated before or after a retry. * * @return a read-only view of a user provided context. */
default ContextView retryContextView() { return Context.empty(); }
Return an immutable copy of this RetrySignal which is guaranteed to give a consistent view of the state at the time at which this method is invoked. This is especially useful if this RetrySignal is a transient view of the state of the underlying retry subscriber,
Returns:an immutable copy of the current RetrySignal, always safe to use
/** * Return an immutable copy of this {@link RetrySignal} which is guaranteed to give a consistent view * of the state at the time at which this method is invoked. * This is especially useful if this {@link RetrySignal} is a transient view of the state of the underlying * retry subscriber, * * @return an immutable copy of the current {@link RetrySignal}, always safe to use */
default RetrySignal copy() { return new ImmutableRetrySignal(totalRetries(), totalRetriesInARow(), failure(), retryContextView()); } }
A RetryBackoffSpec preconfigured for exponential backoff strategy with jitter, given a maximum number of retry attempts and a minimum Duration for the backoff.

Params:
  • maxAttempts – the maximum number of retry attempts to allow
  • minBackoff – the minimum Duration for the first backoff
See Also:
Returns:the exponential backoff spec for further configuration
/** * A {@link RetryBackoffSpec} preconfigured for exponential backoff strategy with jitter, given a maximum number of retry attempts * and a minimum {@link Duration} for the backoff. * <p> * <img class="marble" src="doc-files/marbles/retrySpecBackoff.svg" alt=""> * * @param maxAttempts the maximum number of retry attempts to allow * @param minBackoff the minimum {@link Duration} for the first backoff * @return the exponential backoff spec for further configuration * @see RetryBackoffSpec#maxAttempts(long) * @see RetryBackoffSpec#minBackoff(Duration) */
public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { return new RetryBackoffSpec(Context.empty(), maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers::parallel, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR); }
A RetryBackoffSpec preconfigured for fixed delays (min backoff equals max backoff, no jitter), given a maximum number of retry attempts and the fixed Duration for the backoff.

Note that calling RetryBackoffSpec.minBackoff(Duration) or RetryBackoffSpec.maxBackoff(Duration) would switch back to an exponential backoff strategy.

Params:
  • maxAttempts – the maximum number of retry attempts to allow
  • fixedDelay – the Duration of the fixed delays
See Also:
Returns:the fixed delays spec for further configuration
/** * A {@link RetryBackoffSpec} preconfigured for fixed delays (min backoff equals max backoff, no jitter), given a maximum number of retry attempts * and the fixed {@link Duration} for the backoff. * <p> * <img class="marble" src="doc-files/marbles/retrySpecFixed.svg" alt=""> * <p> * Note that calling {@link RetryBackoffSpec#minBackoff(Duration)} or {@link RetryBackoffSpec#maxBackoff(Duration)} would switch * back to an exponential backoff strategy. * * @param maxAttempts the maximum number of retry attempts to allow * @param fixedDelay the {@link Duration} of the fixed delays * @return the fixed delays spec for further configuration * @see RetryBackoffSpec#maxAttempts(long) * @see RetryBackoffSpec#minBackoff(Duration) * @see RetryBackoffSpec#maxBackoff(Duration) */
public static RetryBackoffSpec fixedDelay(long maxAttempts, Duration fixedDelay) { return new RetryBackoffSpec(Context.empty(), maxAttempts, t -> true, false, fixedDelay, fixedDelay, 0d, Schedulers::parallel, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR); }
A RetrySpec preconfigured for a simple strategy with maximum number of retry attempts.

Params:
  • max – the maximum number of retry attempts to allow
See Also:
Returns:the max attempt spec for further configuration
/** * A {@link RetrySpec} preconfigured for a simple strategy with maximum number of retry attempts. * <p> * <img class="marble" src="doc-files/marbles/retrySpecAttempts.svg" alt=""> * * @param max the maximum number of retry attempts to allow * @return the max attempt spec for further configuration * @see RetrySpec#maxAttempts(long) */
public static RetrySpec max(long max) { return new RetrySpec(Context.empty(), max, t -> true, false, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetrySpec.RETRY_EXCEPTION_GENERATOR); }
A RetrySpec preconfigured for a simple strategy with maximum number of retry attempts over subsequent transient errors. An Subscriber.onNext(Object) between errors resets the counter (see RetrySpec.transientErrors(boolean)).

Params:
  • maxInARow – the maximum number of retry attempts to allow in a row, reset by successful onNext
See Also:
Returns:the max in a row spec for further configuration
/** * A {@link RetrySpec} preconfigured for a simple strategy with maximum number of retry attempts over * subsequent transient errors. An {@link org.reactivestreams.Subscriber#onNext(Object)} between * errors resets the counter (see {@link RetrySpec#transientErrors(boolean)}). * <p> * <img class="marble" src="doc-files/marbles/retrySpecInARow.svg" alt=""> * * @param maxInARow the maximum number of retry attempts to allow in a row, reset by successful onNext * @return the max in a row spec for further configuration * @see RetrySpec#maxAttempts(long) * @see RetrySpec#transientErrors(boolean) */
public static RetrySpec maxInARow(long maxInARow) { return new RetrySpec(Context.empty(), maxInARow, t -> true, true, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RETRY_EXCEPTION_GENERATOR); }
A RetrySpec preconfigured for the most simplistic retry strategy: retry immediately and indefinitely (similar to Flux.retry()).
Returns:the retry indefinitely spec for further configuration
/** * A {@link RetrySpec} preconfigured for the most simplistic retry strategy: retry immediately and indefinitely * (similar to {@link Flux#retry()}). * * @return the retry indefinitely spec for further configuration */
public static RetrySpec indefinitely() { return new RetrySpec(Context.empty(), Long.MAX_VALUE, t -> true, false, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetrySpec.RETRY_EXCEPTION_GENERATOR); }
A wrapper around Function to provide Retry by using lambda expressions.
Params:
  • function – the Function representing the desired Retry strategy as a lambda
Returns:the Retry strategy adapted from the Function
/** * A wrapper around {@link Function} to provide {@link Retry} by using lambda expressions. * * @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda * @return the {@link Retry} strategy adapted from the {@link Function} */
public static final Retry from(Function<Flux<RetrySignal>, ? extends Publisher<?>> function) { return new Retry(Context.empty()) { @Override public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignalCompanion) { return function.apply(retrySignalCompanion); } }; }
An adapter for Flux of Throwable-based Function to provide Retry from a legacy retryWhen Function.
Params:
  • function – the Function representing the desired Retry strategy as a lambda
Returns:the Retry strategy adapted from the Function
/** * An adapter for {@link Flux} of {@link Throwable}-based {@link Function} to provide {@link Retry} * from a legacy retryWhen {@link Function}. * * @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda * @return the {@link Retry} strategy adapted from the {@link Function} */
public static final Retry withThrowable(Function<Flux<Throwable>, ? extends Publisher<?>> function) { return new Retry(Context.empty()) { @Override public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals) { return function.apply(retrySignals.map(RetrySignal::failure)); } }; } }