package reactor.core.publisher;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
final class FluxOnErrorResume<T> extends InternalFluxOperator<T, T> {
final Function<? super Throwable, ? extends Publisher<? extends T>> nextFactory;
FluxOnErrorResume(Flux<? extends T> source,
Function<? super Throwable, ? extends Publisher<? extends T>> nextFactory) {
super(source);
this.nextFactory = Objects.requireNonNull(nextFactory, "nextFactory");
}
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return new ResumeSubscriber<>(actual, nextFactory);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
static final class ResumeSubscriber<T>
extends Operators.MultiSubscriptionSubscriber<T, T> {
final Function<? super Throwable, ? extends Publisher<? extends T>> nextFactory;
boolean second;
ResumeSubscriber(CoreSubscriber<? super T> actual,
Function<? super Throwable, ? extends Publisher<? extends T>> nextFactory) {
super(actual);
this.nextFactory = nextFactory;
}
@Override
public void onSubscribe(Subscription s) {
if (!second) {
actual.onSubscribe(this);
}
set(s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
if (!second) {
producedOne();
}
}
@Override
public void onError(Throwable t) {
if (!second) {
second = true;
Publisher<? extends T> p;
try {
p = Objects.requireNonNull(nextFactory.apply(t),
"The nextFactory returned a null Publisher");
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
_e = Exceptions.addSuppressed(_e, t);
actual.onError(_e);
return;
}
p.subscribe(this);
}
else {
actual.onError(t);
}
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
}
}