package reactor.core.publisher;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
final class FluxOnErrorResume<T> extends FluxOperator<T, T> {
final Function<? super Throwable, ? extends Publisher<? extends T>> nextFactory;
FluxOnErrorResume(Flux<? extends T> source,
Function<? super Throwable, ? extends Publisher<? extends T>> nextFactory) {
super(source);
this.nextFactory = Objects.requireNonNull(nextFactory, "nextFactory");
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new ResumeSubscriber<>(actual, nextFactory));
}
static final class ResumeSubscriber<T>
extends Operators.MultiSubscriptionSubscriber<T, T> {
final Function<? super Throwable, ? extends Publisher<? extends T>> nextFactory;
boolean second;
ResumeSubscriber(CoreSubscriber<? super T> actual,
Function<? super Throwable, ? extends Publisher<? extends T>> nextFactory) {
super(actual);
this.nextFactory = nextFactory;
}
@Override
public void onSubscribe(Subscription s) {
if (!second) {
actual.onSubscribe(this);
}
set(s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
if (!second) {
producedOne();
}
}
@Override
public void onError(Throwable t) {
if (!second) {
second = true;
Publisher<? extends T> p;
try {
p = Objects.requireNonNull(nextFactory.apply(t),
"The nextFactory returned a null Publisher");
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
_e = Exceptions.addSuppressed(_e, t);
actual.onError(_e);
return;
}
p.subscribe(this);
}
else {
actual.onError(t);
}
}
}
}