package reactor.core.publisher;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.LongSupplier;
import java.util.stream.Stream;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.WaitStrategy;
import reactor.util.context.Context;
public final class MonoProcessor<O> extends Mono<O>
implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription,
Scannable,
LongSupplier {
public static <T> MonoProcessor<T> create() {
return new MonoProcessor<>(null);
}
public static <T> MonoProcessor<T> create(WaitStrategy waitStrategy) {
return new MonoProcessor<>(null, waitStrategy);
}
final WaitStrategy waitStrategy;
volatile NextInner<O>[] subscribers;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<MonoProcessor, NextInner[]> SUBSCRIBERS =
AtomicReferenceFieldUpdater.newUpdater(MonoProcessor.class,
NextInner[].class,
"subscribers");
@SuppressWarnings("rawtypes")
static final NextInner[] EMPTY = new NextInner[0];
@SuppressWarnings("rawtypes")
static final NextInner[] TERMINATED = new NextInner[0];
@SuppressWarnings("rawtypes")
static final NextInner[] EMPTY_WITH_SOURCE = new NextInner[0];
Publisher<? extends O> source;
Throwable error;
O value;
volatile Subscription subscription;
static final AtomicReferenceFieldUpdater<MonoProcessor, Subscription> UPSTREAM =
AtomicReferenceFieldUpdater.newUpdater(MonoProcessor.class, Subscription
.class, "subscription");
MonoProcessor(@Nullable Publisher<? extends O> source) {
this(source, WaitStrategy.sleeping());
}
MonoProcessor(@Nullable Publisher<? extends O> source, WaitStrategy waitStrategy) {
this.waitStrategy = Objects.requireNonNull(waitStrategy, "waitStrategy");
this.source = source;
SUBSCRIBERS.lazySet(this, source != null ? EMPTY_WITH_SOURCE : EMPTY);
}
@Override
public final void cancel() {
if (isTerminated()) {
return;
}
Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
if (s == Operators.cancelledSubscription()) {
return;
}
source = null;
if (s != null) {
s.cancel();
}
}
@Override
@SuppressWarnings("unchecked")
public void dispose() {
Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
if (s == Operators.cancelledSubscription()) {
return;
}
source = null;
if (s != null) {
s.cancel();
}
NextInner<O>[] a;
if ((a = SUBSCRIBERS.getAndSet(this, TERMINATED)) != TERMINATED) {
Exception e = new CancellationException("Disposed");
error = e;
value = null;
for (NextInner<O> as : a) {
as.onError(e);
}
}
waitStrategy.signalAllWhenBlocking();
}
@Override
@Nullable
public O block() {
return block(WaitStrategy.NOOP_SPIN_OBSERVER);
}
@Override
@Nullable
public O block(Duration timeout) {
long delay = System.nanoTime() + timeout.toNanos();
Runnable spinObserver = () -> {
if (delay < System.nanoTime()) {
WaitStrategy.alert();
}
};
return block(spinObserver);
}
@Nullable
O block(Runnable spinObserver) {
try {
if (!isPending()) {
return peek();
}
connect();
try {
long endState = waitStrategy.waitFor(3L, this, spinObserver);
switch ((int)endState) {
case 3:
return value;
case 4:
return null;
case 5:
RuntimeException re = Exceptions.propagate(error);
re = Exceptions.addSuppressed(re, new Exception("Mono#block terminated with an error"));
throw re;
}
throw new IllegalStateException("Mono has been cancelled");
}
catch (RuntimeException ce) {
if(WaitStrategy.isAlert(ce)) {
cancel();
throw new IllegalStateException("Timeout on Mono blocking read");
}
throw ce;
}
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Thread Interruption on Mono blocking read");
}
}
@Override
public long getAsLong() {
NextInner<O>[] inners = subscribers;
if (inners == TERMINATED) {
if (error != null) {
return 5L;
}
if (value == null) {
return 4L;
}
return 3L;
}
if (subscription == Operators.cancelledSubscription()) {
return -1L;
}
if (subscribers != EMPTY && subscribers != EMPTY_WITH_SOURCE) {
return 2L;
}
return 0L;
}
@Nullable
public final Throwable getError() {
return isTerminated() ? error : null;
}
public boolean isCancelled() {
return subscription == Operators.cancelledSubscription() && !isTerminated();
}
public final boolean isError() {
return getError() != null;
}
public final boolean isSuccess() {
return isTerminated() && error == null;
}
public final boolean isTerminated() {
return subscribers == TERMINATED;
}
@Override
public boolean isDisposed() {
return isTerminated() || isCancelled();
}
@Override
public final void onComplete() {
onNext(null);
}
@Override
@SuppressWarnings("unchecked")
public final void onError(Throwable cause) {
Objects.requireNonNull(cause, "onError cannot be null");
if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription())
== Operators.cancelledSubscription()) {
Operators.onErrorDroppedMulticast(cause);
return;
}
error = cause;
value = null;
source = null;
for (NextInner<O> as : SUBSCRIBERS.getAndSet(this, TERMINATED)) {
as.onError(cause);
}
waitStrategy.signalAllWhenBlocking();
}
@Override
@SuppressWarnings("unchecked")
public final void onNext(@Nullable O value) {
Subscription s;
if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()))
== Operators.cancelledSubscription()) {
if (value != null) {
Operators.onNextDroppedMulticast(value);
}
return;
}
this.value = value;
Publisher<? extends O> parent = source;
source = null;
NextInner<O>[] array = SUBSCRIBERS.getAndSet(this, TERMINATED);
if (value == null) {
for (NextInner<O> as : array) {
as.onComplete();
}
}
else {
if (s != null && !(parent instanceof Mono)) {
s.cancel();
}
for (NextInner<O> as : array) {
as.complete(value);
}
}
waitStrategy.signalAllWhenBlocking();
}
@Override
public final void onSubscribe(Subscription subscription) {
if (Operators.setOnce(UPSTREAM, this, subscription)) {
subscription.request(Long.MAX_VALUE);
}
}
@Override
public Stream<? extends Scannable> inners() {
return Stream.of(subscribers);
}
@Nullable
public O peek() {
if (!isTerminated()) {
return null;
}
if (value != null) {
return value;
}
if (error != null) {
RuntimeException re = Exceptions.propagate(error);
re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error"));
throw re;
}
return null;
}
@Override
public final void request(long n) {
Operators.validate(n);
}
@Override
public void subscribe(final CoreSubscriber<? super O> actual) {
NextInner<O> as = new NextInner<>(actual, this);
actual.onSubscribe(as);
if (add(as)) {
if (as.isCancelled()) {
remove(as);
}
}
else {
Throwable ex = error;
if (ex != null) {
actual.onError(ex);
}
else {
O v = value;
if (v != null) {
as.complete(v);
}
else {
as.onComplete();
}
}
}
}
void connect() {
Publisher<? extends O> parent = source;
if (parent != null && SUBSCRIBERS.compareAndSet(this, EMPTY_WITH_SOURCE, EMPTY)) {
parent.subscribe(this);
}
}
@Override
public Context currentContext() {
return Operators.multiSubscribersContext(subscribers);
}
@Override
@Nullable
public Object scanUnsafe(Attr key) {
boolean t = isTerminated();
if (key == Attr.TERMINATED) return t;
if (key == Attr.PARENT) return subscription;
if (key == Attr.ERROR) return error;
if (key == Attr.PREFETCH) return Integer.MAX_VALUE;
if (key == Attr.CANCELLED) return isCancelled();
return null;
}
final boolean isPending() {
return !isTerminated();
}
public final long downstreamCount() {
return subscribers.length;
}
public final boolean hasDownstreams() {
return downstreamCount() != 0;
}
boolean add(NextInner<O> ps) {
for (;;) {
NextInner<O>[] a = subscribers;
if (a == TERMINATED) {
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
NextInner<O>[] b = new NextInner[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (SUBSCRIBERS.compareAndSet(this, a, b)) {
Publisher<? extends O> parent = source;
if (parent != null && a == EMPTY_WITH_SOURCE) {
parent.subscribe(this);
}
return true;
}
}
}
@SuppressWarnings("unchecked")
void remove(NextInner<O> ps) {
for (;;) {
NextInner<O>[] a = subscribers;
int n = a.length;
if (n == 0) {
return;
}
int j = -1;
for (int i = 0; i < n; i++) {
if (a[i] == ps) {
j = i;
break;
}
}
if (j < 0) {
return;
}
NextInner<O>[] b;
if (n == 1) {
b = EMPTY;
} else {
b = new NextInner[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (SUBSCRIBERS.compareAndSet(this, a, b)) {
return;
}
}
}
final static class NextInner<T> extends Operators.MonoSubscriber<T, T> {
final MonoProcessor<T> parent;
NextInner(CoreSubscriber<? super T> actual, MonoProcessor<T> parent) {
super(actual);
this.parent = parent;
}
@Override
public void cancel() {
if (STATE.getAndSet(this, CANCELLED) != CANCELLED) {
parent.remove(this);
}
}
@Override
public void onComplete() {
if (!isCancelled()) {
actual.onComplete();
}
}
@Override
public void onError(Throwable t) {
if (isCancelled()) {
Operators.onOperatorError(t, currentContext());
} else {
actual.onError(t);
}
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) {
return parent;
}
return super.scanUnsafe(key);
}
}
}