/*
* Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.reactivestreams.Subscription;
import reactor.core.Exceptions;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
A strategy to evaluate if errors that happens during an operator's onNext call can be recovered from, allowing the sequence to continue. This opt-in strategy is applied by compatible operators through the Operators.onNextError(Object, Throwable, Context, Subscription)
and Operators.onNextPollError(Object, Throwable, Context)
methods. See stop()
, resumeDrop()
, resumeDropIf(Predicate<Throwable>)
, resume(BiConsumer<Throwable,Object>)
and resumeIf(Predicate<Throwable>, BiConsumer<Throwable,Object>)
for the possible strategies. Author: Simon Baslé
/**
* A strategy to evaluate if errors that happens during an operator's onNext call can
* be recovered from, allowing the sequence to continue. This opt-in strategy is
* applied by compatible operators through the {@link Operators#onNextError(Object, Throwable, Context, Subscription)}
* and {@link Operators#onNextPollError(Object, Throwable, Context)} methods.
* See {@link #stop()}, {@link #resumeDrop()}, {@link #resumeDropIf(Predicate)},
* {@link #resume(BiConsumer)} and {@link #resumeIf(Predicate, BiConsumer)}
* for the possible strategies.
*
* @author Simon Baslé
*/
interface OnNextFailureStrategy extends BiFunction<Throwable, Object, Throwable>,
BiPredicate<Throwable, Object> {
The key that can be used to store an OnNextFailureStrategy
in a Context
. /**
* The key that can be used to store an {@link OnNextFailureStrategy} in a {@link Context}.
*/
String KEY_ON_NEXT_ERROR_STRATEGY = "reactor.onNextError.localStrategy";
@Override
@Nullable
default Throwable apply(Throwable throwable, @Nullable Object o) {
return process(throwable, o, Context.empty());
}
@Override
boolean test(Throwable throwable, @Nullable Object o);
Process an error and the optional value that caused it (when applicable) in
preparation for sequence resume, so that the error is not completely swallowed.
If the strategy cannot resume this kind of error (ie. test(Throwable, Object)
returns false), return the original error. Any exception in the processing will be caught and returned. If the strategy was able to process the error correctly, returns null.
Params: - error – the error being recovered from.
- value – the value causing the error, null if not applicable.
- context – the
Context
associated with the recovering sequence.
See Also: Returns: null if the error was processed for resume, a Throwable
to propagate otherwise.
/**
* Process an error and the optional value that caused it (when applicable) in
* preparation for sequence resume, so that the error is not completely swallowed.
* <p>
* If the strategy cannot resume this kind of error (ie. {@link #test(Throwable, Object)}
* returns false), return the original error. Any exception in the processing will be
* caught and returned. If the strategy was able to process the error correctly,
* returns null.
*
* @param error the error being recovered from.
* @param value the value causing the error, null if not applicable.
* @param context the {@link Context} associated with the recovering sequence.
* @return null if the error was processed for resume, a {@link Throwable} to propagate
* otherwise.
* @see #test(Throwable, Object)
*/
@Nullable
Throwable process(Throwable error, @Nullable Object value, Context context);
A strategy that never let any error resume.
/**
* A strategy that never let any error resume.
*/
static OnNextFailureStrategy stop() {
return STOP;
}
A strategy that let all error resume. When processing, the error is passed to the Operators.onErrorDropped(Throwable, Context)
hook and the incriminating source value (if available) is passed to the Operators.onNextDropped(Object, Context)
hook, allowing the sequence to continue with further values. /**
* A strategy that let all error resume. When processing, the error is passed to the
* {@link Operators#onErrorDropped(Throwable, Context)} hook and the incriminating
* source value (if available) is passed to the {@link Operators#onNextDropped(Object, Context)}
* hook, allowing the sequence to continue with further values.
*/
static OnNextFailureStrategy resumeDrop() {
return RESUME_DROP;
}
A strategy that let some error resume. When processing, the error is passed to the Operators.onErrorDropped(Throwable, Context)
hook and the incriminating source value (if available) is passed to the Operators.onNextDropped(Object, Context)
hook, allowing the sequence to continue with further values.
Any exception thrown by the predicate is thrown as is.
Params: - causePredicate – the predicate to match in order to resume from an error.
/**
* A strategy that let some error resume. When processing, the error is passed to the
* {@link Operators#onErrorDropped(Throwable, Context)} hook and the incriminating
* source value (if available) is passed to the {@link Operators#onNextDropped(Object, Context)}
* hook, allowing the sequence to continue with further values.
* <p>
* Any exception thrown by the predicate is thrown as is.
*
* @param causePredicate the predicate to match in order to resume from an error.
*/
static OnNextFailureStrategy resumeDropIf(Predicate<Throwable> causePredicate) {
return new ResumeDropStrategy(causePredicate);
}
A strategy that let all errors resume. When processing, the error and the incriminating source value are passed to custom Consumers
. Any exception thrown by the consumers will suppress the original error and be returned for propagation. If the original error is fatal then it is thrown upon processing it (see Exceptions.throwIfFatal(Throwable)
).
Params: - errorConsumer – the
BiConsumer<Throwable, Object>
to process the recovered errors with. It must deal with potential null
s.
Returns: a new OnNextFailureStrategy
that allows resuming the sequence.
/**
* A strategy that let all errors resume. When processing, the error and the
* incriminating source value are passed to custom {@link Consumer Consumers}.
* <p>
* Any exception thrown by the consumers will suppress the original error and be
* returned for propagation. If the original error is fatal then it is thrown
* upon processing it (see {@link Exceptions#throwIfFatal(Throwable)}).
*
* @param errorConsumer the {@link BiConsumer<Throwable, Object>} to process the recovered errors with.
* It must deal with potential {@code null}s.
* @return a new {@link OnNextFailureStrategy} that allows resuming the sequence.
*/
static OnNextFailureStrategy resume(BiConsumer<Throwable, Object> errorConsumer) {
return new ResumeStrategy(null, errorConsumer);
}
A strategy that let some errors resume. When processing, the error and the incriminating source value are passed to custom Consumers
. Any exception thrown by the predicate is thrown as is. Any exception thrown by the consumers will suppress the original error and be returned for propagation. Even if the original error is fatal, if it passes the predicate then it can be processed and recovered from (see Exceptions.throwIfFatal(Throwable)
).
Params: - causePredicate – the
Predicate
to use to determine if a failure should be recovered from. - errorConsumer – the
BiConsumer<Throwable, Object>
to process the recovered errors with. It must deal with potential null
s.
Returns: a new OnNextFailureStrategy
that allows resuming the sequence.
/**
* A strategy that let some errors resume. When processing, the error and the
* incriminating source value are passed to custom {@link Consumer Consumers}.
* <p>
* Any exception thrown by the predicate is thrown as is. Any exception thrown by
* the consumers will suppress the original error and be returned for propagation.
* Even if the original error is fatal, if it passes the predicate then it can
* be processed and recovered from (see {@link Exceptions#throwIfFatal(Throwable)}).
*
* @param causePredicate the {@link Predicate} to use to determine if a failure
* should be recovered from.
* @param errorConsumer the {@link BiConsumer<Throwable, Object>} to process the recovered errors with.
* It must deal with potential {@code null}s.
* @return a new {@link OnNextFailureStrategy} that allows resuming the sequence.
*/
static OnNextFailureStrategy resumeIf(
Predicate<Throwable> causePredicate,
BiConsumer<Throwable, Object> errorConsumer) {
return new ResumeStrategy(causePredicate, errorConsumer);
}
//==== IMPLEMENTATIONS ====
OnNextFailureStrategy STOP = new OnNextFailureStrategy() {
@Override
public boolean test(Throwable error, @Nullable Object value) {
return false;
}
@Override
public Throwable process(Throwable error, @Nullable Object value, Context context) {
Exceptions.throwIfFatal(error);
Throwable iee = new IllegalStateException("STOP strategy cannot process errors");
iee.addSuppressed(error);
return iee;
}
};
OnNextFailureStrategy RESUME_DROP = new ResumeDropStrategy(null);
final class ResumeStrategy implements OnNextFailureStrategy {
final Predicate<Throwable> errorPredicate;
final BiConsumer<Throwable, Object> errorConsumer;
ResumeStrategy(@Nullable Predicate<Throwable> errorPredicate,
BiConsumer<Throwable, Object> errorConsumer) {
this.errorPredicate = errorPredicate;
this.errorConsumer = errorConsumer;
}
@Override
public boolean test(Throwable error, @Nullable Object value) {
return errorPredicate == null || errorPredicate.test(error);
}
@Override
@Nullable
public Throwable process(Throwable error, @Nullable Object value, Context context) {
if (errorPredicate == null) {
Exceptions.throwIfFatal(error);
}
else if (!errorPredicate.test(error)) {
Exceptions.throwIfFatal(error);
return error;
}
try {
errorConsumer.accept(error, value);
return null;
}
catch (Throwable e) {
e.addSuppressed(error);
return e;
}
}
}
final class ResumeDropStrategy implements OnNextFailureStrategy {
final Predicate<Throwable> errorPredicate;
ResumeDropStrategy(@Nullable Predicate<Throwable> errorPredicate) {
this.errorPredicate = errorPredicate;
}
@Override
public boolean test(Throwable error, @Nullable Object value) {
return errorPredicate == null || errorPredicate.test(error);
}
@Override
@Nullable
public Throwable process(Throwable error, @Nullable Object value, Context context) {
if (errorPredicate == null) {
Exceptions.throwIfFatal(error);
}
else if (!errorPredicate.test(error)) {
Exceptions.throwIfFatal(error);
return error;
}
try {
if (value != null) {
Operators.onNextDropped(value, context);
}
Operators.onErrorDropped(error, context);
return null;
}
catch (Throwable e) {
e.addSuppressed(error);
return e;
}
}
}
final class LambdaOnNextErrorStrategy implements OnNextFailureStrategy {
private final BiFunction<? super Throwable, Object, ? extends Throwable> delegateProcessor;
private final BiPredicate<? super Throwable, Object> delegatePredicate;
@SuppressWarnings("unchecked")
public LambdaOnNextErrorStrategy(
BiFunction<? super Throwable, Object, ? extends Throwable> delegateProcessor) {
this.delegateProcessor = delegateProcessor;
if (delegateProcessor instanceof BiPredicate) {
this.delegatePredicate = (BiPredicate<? super Throwable, Object>) delegateProcessor;
}
else {
this.delegatePredicate = (e, v) -> true;
}
}
@Override
public boolean test(Throwable error, @Nullable Object value) {
return delegatePredicate.test(error, value);
}
@Override
@Nullable
public Throwable process(Throwable error, @Nullable Object value, Context ignored) {
return delegateProcessor.apply(error, value);
}
}
}