package reactor.core.publisher;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
final class MonoIgnoreThen<T> extends Mono<T> implements Fuseable, Scannable {
final Publisher<?>[] ignore;
final Mono<T> last;
MonoIgnoreThen(Publisher<?>[] ignore, Mono<T> last) {
this.ignore = Objects.requireNonNull(ignore, "ignore");
this.last = Objects.requireNonNull(last, "last");
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
ThenIgnoreMain<T> manager = new ThenIgnoreMain<>(actual, ignore, last);
actual.onSubscribe(manager);
manager.drain();
}
<U> MonoIgnoreThen<U> shift(Mono<U> newLast) {
Objects.requireNonNull(newLast, "newLast");
Publisher<?>[] a = ignore;
int n = a.length;
Publisher<?>[] b = new Publisher[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = last;
return new MonoIgnoreThen<>(b, newLast);
}
@Override
public Object scanUnsafe(Attr key) {
return null;
}
static final class ThenIgnoreMain<T>
extends Operators.MonoSubscriber<T, T> {
final ThenIgnoreInner ignore;
final ThenAcceptInner<T> accept;
final Publisher<?>[] ignoreMonos;
final Mono<T> lastMono;
int index;
volatile boolean active;
volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<ThenIgnoreMain> WIP =
AtomicIntegerFieldUpdater.newUpdater(ThenIgnoreMain.class, "wip");
ThenIgnoreMain(CoreSubscriber<? super T> subscriber,
Publisher<?>[] ignoreMonos, Mono<T> lastMono) {
super(subscriber);
this.ignoreMonos = ignoreMonos;
this.lastMono = lastMono;
this.ignore = new ThenIgnoreInner(this);
this.accept = new ThenAcceptInner<>(this);
}
@Override
public Stream<? extends Scannable> inners() {
return Stream.of(ignore, accept);
}
@SuppressWarnings("unchecked")
void drain() {
if (WIP.getAndIncrement(this) != 0) {
return;
}
for (;;) {
if (isCancelled()) {
return;
}
if (!active) {
Publisher<?>[] a = ignoreMonos;
int i = index;
if (i == a.length) {
ignore.clear();
Mono<T> m = lastMono;
if (m instanceof Callable) {
T v;
try {
v = ((Callable<T>)m).call();
}
catch (Throwable ex) {
actual.onError(Operators.onOperatorError(ex,
actual.currentContext()));
return;
}
if (v == null) {
actual.onComplete();
}
else {
complete(v);
}
return;
}
active = true;
m.subscribe(accept);
} else {
Publisher<?> m = a[i];
index = i + 1;
if (m instanceof Callable) {
try {
((Callable<?>)m).call();
}
catch (Throwable ex) {
actual.onError(Operators.onOperatorError(ex,
actual.currentContext()));
return;
}
continue;
}
active = true;
m.subscribe(ignore);
}
}
if (WIP.decrementAndGet(this) == 0) {
break;
}
}
}
@Override
public void cancel() {
super.cancel();
ignore.cancel();
accept.cancel();
}
void ignoreDone() {
active = false;
drain();
}
}
static final class ThenIgnoreInner implements InnerConsumer<Object> {
final ThenIgnoreMain<?> parent;
volatile Subscription s;
static final AtomicReferenceFieldUpdater<ThenIgnoreInner, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(ThenIgnoreInner.class, Subscription.class, "s");
ThenIgnoreInner(ThenIgnoreMain<?> parent) {
this.parent = parent;
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.ACTUAL) return parent;
if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
return null;
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.replace(S, this, s)) {
s.request(Long.MAX_VALUE);
}
}
@Override
public Context currentContext() {
return parent.currentContext();
}
@Override
public void onNext(Object t) {
Operators.onDiscard(t, currentContext());
}
@Override
public void onError(Throwable t) {
this.parent.onError(t);
}
@Override
public void onComplete() {
this.parent.ignoreDone();
}
void cancel() {
Operators.terminate(S, this);
}
void clear() {
S.lazySet(this, null);
}
}
static final class ThenAcceptInner<T> implements InnerConsumer<T> {
final ThenIgnoreMain<T> parent;
volatile Subscription s;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<ThenAcceptInner, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(ThenAcceptInner.class, Subscription.class, "s");
boolean done;
ThenAcceptInner(ThenIgnoreMain<T> parent) {
this.parent = parent;
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return s;
if (key == Attr.ACTUAL) return parent;
if (key == Attr.TERMINATED) return done;
if (key == Attr.CANCELLED) return s == Operators.cancelledSubscription();
return null;
}
@Override
public Context currentContext() {
return parent.currentContext();
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.setOnce(S, this, s)) {
s.request(Long.MAX_VALUE);
}
}
@Override
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, parent.currentContext());
return;
}
done = true;
this.parent.complete(t);
}
@Override
public void onError(Throwable t) {
if (done) {
Operators.onErrorDropped(t, parent.currentContext());
return;
}
done = true;
this.parent.onError(t);
}
@Override
public void onComplete() {
if (done) {
return;
}
this.parent.onComplete();
}
void cancel() {
Operators.terminate(S, this);
}
}
}