package reactor.core.publisher;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;
final class MonoWhen extends Mono<Void> implements SourceProducer<Void> {
final boolean delayError;
final Publisher<?>[] sources;
final Iterable<? extends Publisher<?>> sourcesIterable;
MonoWhen(boolean delayError, Publisher<?>... sources) {
this.delayError = delayError;
this.sources = Objects.requireNonNull(sources, "sources");
this.sourcesIterable = null;
}
MonoWhen(boolean delayError, Iterable<? extends Publisher<?>> sourcesIterable) {
this.delayError = delayError;
this.sources = null;
this.sourcesIterable = Objects.requireNonNull(sourcesIterable, "sourcesIterable");
}
@SuppressWarnings("unchecked")
@Nullable
Mono<Void> whenAdditionalSource(Publisher<?> source) {
Publisher[] oldSources = sources;
if (oldSources != null) {
int oldLen = oldSources.length;
Publisher<?>[] newSources = new Publisher[oldLen + 1];
System.arraycopy(oldSources, 0, newSources, 0, oldLen);
newSources[oldLen] = source;
return new MonoWhen(delayError, newSources);
}
return null;
}
@SuppressWarnings("unchecked")
@Override
public void subscribe(CoreSubscriber<? super Void> actual) {
Publisher<?>[] a;
int n = 0;
if (sources != null) {
a = sources;
n = a.length;
}
else {
a = new Publisher[8];
for (Publisher<?> m : sourcesIterable) {
if (n == a.length) {
Publisher<?>[] b = new Publisher[n + (n >> 2)];
System.arraycopy(a, 0, b, 0, n);
a = b;
}
a[n++] = m;
}
}
if (n == 0) {
Operators.complete(actual);
return;
}
WhenCoordinator parent = new WhenCoordinator(actual, n, delayError);
actual.onSubscribe(parent);
parent.subscribe(a);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.DELAY_ERROR) return delayError;
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
}
static final class WhenCoordinator extends Operators.MonoSubscriber<Object, Void> {
final WhenInner[] subscribers;
final boolean delayError;
volatile int done;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<WhenCoordinator> DONE =
AtomicIntegerFieldUpdater.newUpdater(WhenCoordinator.class, "done");
@SuppressWarnings("unchecked")
WhenCoordinator(CoreSubscriber<? super Void> subscriber,
int n,
boolean delayError) {
super(subscriber);
this.delayError = delayError;
subscribers = new WhenInner[n];
for (int i = 0; i < n; i++) {
subscribers[i] = new WhenInner(this);
}
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) {
return done == subscribers.length;
}
if (key == Attr.BUFFERED) {
return subscribers.length;
}
if (key == Attr.DELAY_ERROR) {
return delayError;
}
if (key == Attr.RUN_STYLE) {
return Attr.RunStyle.SYNC;
}
return super.scanUnsafe(key);
}
@Override
public Stream<? extends Scannable> inners() {
return Stream.of(subscribers);
}
void subscribe(Publisher<?>[] sources) {
WhenInner[] a = subscribers;
for (int i = 0; i < a.length; i++) {
sources[i].subscribe(a[i]);
}
}
void signalError(Throwable t) {
if (delayError) {
signal();
}
else {
int n = subscribers.length;
if (DONE.getAndSet(this, n) != n) {
cancel();
actual.onError(t);
}
}
}
@SuppressWarnings("unchecked")
void signal() {
WhenInner[] a = subscribers;
int n = a.length;
if (DONE.incrementAndGet(this) != n) {
return;
}
Throwable error = null;
Throwable compositeError = null;
for (int i = 0; i < a.length; i++) {
WhenInner m = a[i];
Throwable e = m.error;
if (e != null) {
if (compositeError != null) {
compositeError.addSuppressed(e);
}
else if (error != null) {
compositeError = Exceptions.multiple(error, e);
}
else {
error = e;
}
}
}
if (compositeError != null) {
actual.onError(compositeError);
}
else if (error != null) {
actual.onError(error);
}
else {
actual.onComplete();
}
}
@Override
public void cancel() {
if (!isCancelled()) {
super.cancel();
for (WhenInner ms : subscribers) {
ms.cancel();
}
}
}
}
static final class WhenInner implements InnerConsumer<Object> {
final WhenCoordinator parent;
volatile Subscription s;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<WhenInner, Subscription> S =
AtomicReferenceFieldUpdater.newUpdater(WhenInner.class,
Subscription.class,
"s");
Throwable error;
WhenInner(WhenCoordinator parent) {
this.parent = parent;
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.CANCELLED) {
return s == Operators.cancelledSubscription();
}
if (key == Attr.PARENT) {
return s;
}
if (key == Attr.ACTUAL) {
return parent;
}
if (key == Attr.ERROR) {
return error;
}
if (key == Attr.RUN_STYLE) {
return Attr.RunStyle.SYNC;
}
return null;
}
@Override
public Context currentContext() {
return parent.currentContext();
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.setOnce(S, this, s)) {
s.request(Long.MAX_VALUE);
}
else {
s.cancel();
}
}
@Override
public void onNext(Object t) {
}
@Override
public void onError(Throwable t) {
error = t;
parent.signalError(t);
}
@Override
public void onComplete() {
parent.signal();
}
void cancel() {
Operators.terminate(S, this);
}
}
}