package reactor.core.publisher;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.util.annotation.Nullable;
final class MonoPeekTerminal<T> extends InternalMonoOperator<T, T> implements Fuseable {
final BiConsumer<? super T, Throwable> onAfterTerminateCall;
final Consumer<? super T> onSuccessCall;
final Consumer<? super Throwable> onErrorCall;
MonoPeekTerminal(Mono<? extends T> source,
@Nullable Consumer<? super T> onSuccessCall,
@Nullable Consumer<? super Throwable> onErrorCall,
@Nullable BiConsumer<? super T, Throwable> onAfterTerminateCall) {
super(source);
this.onAfterTerminateCall = onAfterTerminateCall;
this.onSuccessCall = onSuccessCall;
this.onErrorCall = onErrorCall;
}
@Override
@SuppressWarnings("unchecked")
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
if (actual instanceof ConditionalSubscriber) {
return new MonoTerminalPeekSubscriber<>((ConditionalSubscriber<? super T>) actual,
this);
}
return new MonoTerminalPeekSubscriber<>(actual, this);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
static final class MonoTerminalPeekSubscriber<T>
implements ConditionalSubscriber<T>, InnerOperator<T, T>,
Fuseable.QueueSubscription<T> {
final CoreSubscriber<? super T> actual;
final ConditionalSubscriber<? super T> actualConditional;
final MonoPeekTerminal<T> parent;
Subscription s;
@Nullable
Fuseable.QueueSubscription<T> queueSubscription;
int sourceMode;
volatile boolean done;
boolean valued;
MonoTerminalPeekSubscriber(ConditionalSubscriber<? super T> actual,
MonoPeekTerminal<T> parent) {
this.actualConditional = actual;
this.actual = actual;
this.parent = parent;
}
MonoTerminalPeekSubscriber(CoreSubscriber<? super T> actual,
MonoPeekTerminal<T> parent) {
this.actual = actual;
this.actualConditional = null;
this.parent = parent;
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return InnerOperator.super.scanUnsafe(key);
}
@Override
public void request(long n) {
s.request(n);
}
@Override
public void cancel() {
s.cancel();
}
@Override
public void onSubscribe(Subscription s) {
this.s = s;
this.queueSubscription = Operators.as(s);
actual.onSubscribe(this);
}
@Override
public void onNext(T t) {
if (sourceMode == ASYNC) {
actual.onNext(null);
}
else {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
valued = true;
if (parent.onSuccessCall != null) {
try {
parent.onSuccessCall.accept(t);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t,
actual.currentContext()));
return;
}
}
actual.onNext(t);
if (parent.onAfterTerminateCall != null) {
try {
parent.onAfterTerminateCall.accept(t, null);
}
catch (Throwable e) {
Operators.onErrorDropped(Operators.onOperatorError(s, e, t,
actual.currentContext()),
actual.currentContext());
}
}
}
}
@Override
public boolean tryOnNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return false;
}
if (actualConditional == null) {
onNext(t);
return false;
}
valued = true;
if (parent.onSuccessCall != null) {
try {
parent.onSuccessCall.accept(t);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
return false;
}
}
boolean r = actualConditional.tryOnNext(t);
if (parent.onAfterTerminateCall != null) {
try {
parent.onAfterTerminateCall.accept(t, null);
}
catch (Throwable e) {
Operators.onErrorDropped(Operators.onOperatorError(s, e, t,
actual.currentContext()),
actual.currentContext());
}
}
return r;
}
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
done = true;
Consumer<? super Throwable> onError = parent.onErrorCall;
if (!valued && onError != null) {
try {
onError.accept(t);
}
catch (Throwable e) {
t = Operators.onOperatorError(null, e, t, actual.currentContext());
}
}
try {
actual.onError(t);
}
catch (UnsupportedOperationException use) {
if (onError == null ||
!Exceptions.isErrorCallbackNotImplemented(use) && use.getCause() != t) {
throw use;
}
}
if (!valued && parent.onAfterTerminateCall != null) {
try {
parent.onAfterTerminateCall.accept(null, t);
}
catch (Throwable e) {
Operators.onErrorDropped(Operators.onOperatorError(e,
actual.currentContext()),
actual.currentContext());
}
}
}
@Override
public void onComplete() {
if (done) {
return;
}
if (sourceMode == NONE && !valued) {
if (parent.onSuccessCall != null) {
try {
parent.onSuccessCall.accept(null);
}
catch (Throwable e) {
onError(Operators.onOperatorError(s, e, actual.currentContext()));
return;
}
}
}
done = true;
actual.onComplete();
if (sourceMode == NONE && !valued && parent.onAfterTerminateCall != null) {
try {
parent.onAfterTerminateCall.accept(null, null);
}
catch (Throwable e) {
Operators.onErrorDropped(Operators.onOperatorError(e,
actual.currentContext()),
actual.currentContext());
}
}
}
@Override
public CoreSubscriber<? super T> actual() {
return actual;
}
@Override
@Nullable
public T poll() {
assert queueSubscription != null;
boolean d = done;
T v = queueSubscription.poll();
if (!valued && (v != null || d || sourceMode == SYNC)) {
valued = true;
if (parent.onSuccessCall != null) {
try {
parent.onSuccessCall.accept(v);
}
catch (Throwable e) {
throw Exceptions.propagate(Operators.onOperatorError(s, e, v,
actual.currentContext()));
}
}
if (parent.onAfterTerminateCall != null) {
try {
parent.onAfterTerminateCall.accept(v, null);
}
catch (Throwable t) {
Operators.onErrorDropped(Operators.onOperatorError(t,
actual.currentContext()),
actual.currentContext());
}
}
}
return v;
}
@Override
public boolean isEmpty() {
return queueSubscription == null || queueSubscription.isEmpty();
}
@Override
public void clear() {
assert queueSubscription != null;
queueSubscription.clear();
}
@Override
public int requestFusion(int requestedMode) {
int m;
if (queueSubscription == null) {
m = NONE;
}
else if ((requestedMode & THREAD_BARRIER) != 0) {
m = NONE;
}
else {
m = queueSubscription.requestFusion(requestedMode);
}
sourceMode = m;
return m;
}
@Override
public int size() {
return queueSubscription == null ? 0 : queueSubscription.size();
}
}
}