/*
 * Copyright (c) 2011-2018 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.LongSupplier;
import java.util.stream.Stream;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.WaitStrategy;
import reactor.util.context.Context;

A MonoProcessor is a Mono extension that implements stateful semantics. Multi-subscribe is allowed.

Once a MonoProcessor has been resolved, newer subscribers will benefit from the cached result.

Author:Stephane Maldini
Type parameters:
  • <O> – the type of the value that will be made available
/** * A {@code MonoProcessor} is a {@link Mono} extension that implements stateful semantics. Multi-subscribe is allowed. * * <p> * <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/monoprocessor.png" alt=""> * <p> * * Once a {@link MonoProcessor} has been resolved, newer subscribers will benefit from the cached result. * * @param <O> the type of the value that will be made available * * @author Stephane Maldini */
public final class MonoProcessor<O> extends Mono<O> implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable, LongSupplier {
Create a MonoProcessor that will eagerly request 1 on onSubscribe(Subscription), cache and emit the eventual result for 1 or N subscribers.
Type parameters:
  • <T> – type of the expected value
Returns:A MonoProcessor.
/** * Create a {@link MonoProcessor} that will eagerly request 1 on {@link #onSubscribe(Subscription)}, cache and emit * the eventual result for 1 or N subscribers. * * @param <T> type of the expected value * * @return A {@link MonoProcessor}. */
public static <T> MonoProcessor<T> create() { return new MonoProcessor<>(null); }
Create a MonoProcessor that will eagerly request 1 on onSubscribe(Subscription), cache and emit the eventual result for 1 or N subscribers.
Params:
Type parameters:
  • <T> – type of the expected value
Returns:A MonoProcessor.
/** * Create a {@link MonoProcessor} that will eagerly request 1 on {@link #onSubscribe(Subscription)}, cache and emit * the eventual result for 1 or N subscribers. * * @param waitStrategy a {@link WaitStrategy} for blocking {@link #block} strategy * @param <T> type of the expected value * * @return A {@link MonoProcessor}. */
public static <T> MonoProcessor<T> create(WaitStrategy waitStrategy) { return new MonoProcessor<>(null, waitStrategy); } final WaitStrategy waitStrategy; volatile NextInner<O>[] subscribers; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater<MonoProcessor, NextInner[]> SUBSCRIBERS = AtomicReferenceFieldUpdater.newUpdater(MonoProcessor.class, NextInner[].class, "subscribers"); @SuppressWarnings("rawtypes") static final NextInner[] EMPTY = new NextInner[0]; @SuppressWarnings("rawtypes") static final NextInner[] TERMINATED = new NextInner[0]; @SuppressWarnings("rawtypes") static final NextInner[] EMPTY_WITH_SOURCE = new NextInner[0]; Publisher<? extends O> source; Throwable error; O value; volatile Subscription subscription; static final AtomicReferenceFieldUpdater<MonoProcessor, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(MonoProcessor.class, Subscription .class, "subscription"); MonoProcessor(@Nullable Publisher<? extends O> source) { this(source, WaitStrategy.sleeping()); } MonoProcessor(@Nullable Publisher<? extends O> source, WaitStrategy waitStrategy) { this.waitStrategy = Objects.requireNonNull(waitStrategy, "waitStrategy"); this.source = source; SUBSCRIBERS.lazySet(this, source != null ? EMPTY_WITH_SOURCE : EMPTY); } @Override public final void cancel() { if (isTerminated()) { return; } Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { return; } source = null; if (s != null) { s.cancel(); } } @Override @SuppressWarnings("unchecked") public void dispose() { Subscription s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription()); if (s == Operators.cancelledSubscription()) { return; } source = null; if (s != null) { s.cancel(); } NextInner<O>[] a; if ((a = SUBSCRIBERS.getAndSet(this, TERMINATED)) != TERMINATED) { Exception e = new CancellationException("Disposed"); error = e; value = null; for (NextInner<O> as : a) { as.onError(e); } } waitStrategy.signalAllWhenBlocking(); }
Block the calling thread indefinitely, waiting for the completion of this MonoProcessor. If the MonoProcessor is completed with an error a RuntimeException that wraps the error is thrown.
Returns:the value of this MonoProcessor
/** * Block the calling thread indefinitely, waiting for the completion of this {@code MonoProcessor}. If the * {@link MonoProcessor} is completed with an error a RuntimeException that wraps the error is thrown. * * @return the value of this {@code MonoProcessor} */
@Override @Nullable public O block() { return block(WaitStrategy.NOOP_SPIN_OBSERVER); }
Block the calling thread for the specified time, waiting for the completion of this MonoProcessor. If the MonoProcessor is completed with an error a RuntimeException that wraps the error is thrown.
Params:
  • timeout – the timeout value as a Duration
Returns:the value of this MonoProcessor or null if the timeout is reached and the MonoProcessor has not completed
/** * Block the calling thread for the specified time, waiting for the completion of this {@code MonoProcessor}. If the * {@link MonoProcessor} is completed with an error a RuntimeException that wraps the error is thrown. * * @param timeout the timeout value as a {@link Duration} * * @return the value of this {@code MonoProcessor} or {@code null} if the timeout is reached and the {@code MonoProcessor} has * not completed */
@Override @Nullable public O block(Duration timeout) { long delay = System.nanoTime() + timeout.toNanos(); Runnable spinObserver = () -> { if (delay < System.nanoTime()) { WaitStrategy.alert(); } }; return block(spinObserver); } @Nullable O block(Runnable spinObserver) { try { if (!isPending()) { return peek(); } connect(); try { //wait for a terminated state (minimum 3) long endState = waitStrategy.waitFor(3L, this, spinObserver); switch ((int)endState) { case 3: //terminated with value return value; case 4: //terminated without value return null; case 5: //terminated with error RuntimeException re = Exceptions.propagate(error); re = Exceptions.addSuppressed(re, new Exception("Mono#block terminated with an error")); throw re; } throw new IllegalStateException("Mono has been cancelled"); } catch (RuntimeException ce) { if(WaitStrategy.isAlert(ce)) { cancel(); throw new IllegalStateException("Timeout on Mono blocking read"); } throw ce; } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IllegalStateException("Thread Interruption on Mono blocking read"); } }
Returns the internal state as a long fulfilled.
  • -1 : cancelled
  • 0 : ready
  • 2 : subscribed
  • 3 : terminated with value
  • 4 : terminated without value
  • 5 : terminated with error
Returns:the internal state from -1 Cancelled to 5 errored, beyond 3 included is fulfilled.
Deprecated: Should not use (to be removed in 3.2)
/** * Returns the internal state as a long fulfilled. * <ul> * <li>-1 : cancelled</li> * <li>0 : ready</li> * <li>2 : subscribed</li> * <li>3 : terminated with value</li> * <li>4 : terminated without value</li> * <li>5 : terminated with error</li> * </ul> * * @return the internal state from -1 Cancelled to 5 errored, beyond 3 included is * fulfilled. * @deprecated Should not use (to be removed in 3.2) */
@Override public long getAsLong() { //FIXME Should Be Removed alongside WaitStrategy NextInner<O>[] inners = subscribers; if (inners == TERMINATED) { if (error != null) { return 5L; // terminated with error } if (value == null) { return 4L; // terminated without value } return 3L; //terminated with value } if (subscription == Operators.cancelledSubscription()) { return -1L; //cancelled } if (subscribers != EMPTY && subscribers != EMPTY_WITH_SOURCE) { return 2L; // subscribed } return 0L; }
Return the produced Throwable error if any or null
Returns:the produced Throwable error if any or null
/** * Return the produced {@link Throwable} error if any or null * * @return the produced {@link Throwable} error if any or null */
@Nullable public final Throwable getError() { return isTerminated() ? error : null; }
Indicates whether this MonoProcessor has been interrupted via cancellation.
Returns:true if this MonoProcessor is cancelled, false otherwise.
/** * Indicates whether this {@code MonoProcessor} has been interrupted via cancellation. * * @return {@code true} if this {@code MonoProcessor} is cancelled, {@code false} * otherwise. */
public boolean isCancelled() { return subscription == Operators.cancelledSubscription() && !isTerminated(); }
Indicates whether this MonoProcessor has been completed with an error.
Returns:true if this MonoProcessor was completed with an error, false otherwise.
/** * Indicates whether this {@code MonoProcessor} has been completed with an error. * * @return {@code true} if this {@code MonoProcessor} was completed with an error, {@code false} otherwise. */
public final boolean isError() { return getError() != null; }
Indicates whether this MonoProcessor has been successfully completed a value.
Returns:true if this MonoProcessor is successful, false otherwise.
/** * Indicates whether this {@code MonoProcessor} has been successfully completed a value. * * @return {@code true} if this {@code MonoProcessor} is successful, {@code false} otherwise. */
public final boolean isSuccess() { return isTerminated() && error == null; }
Indicates whether this MonoProcessor has been terminated by the source producer with a success or an error.
Returns:true if this MonoProcessor is successful, false otherwise.
/** * Indicates whether this {@code MonoProcessor} has been terminated by the * source producer with a success or an error. * * @return {@code true} if this {@code MonoProcessor} is successful, {@code false} otherwise. */
public final boolean isTerminated() { return subscribers == TERMINATED; } @Override public boolean isDisposed() { return isTerminated() || isCancelled(); } @Override public final void onComplete() { onNext(null); } @Override @SuppressWarnings("unchecked") public final void onError(Throwable cause) { Objects.requireNonNull(cause, "onError cannot be null"); if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) { Operators.onErrorDroppedMulticast(cause); return; } error = cause; value = null; source = null; for (NextInner<O> as : SUBSCRIBERS.getAndSet(this, TERMINATED)) { as.onError(cause); } waitStrategy.signalAllWhenBlocking(); } @Override @SuppressWarnings("unchecked") public final void onNext(@Nullable O value) { Subscription s; if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription())) == Operators.cancelledSubscription()) { if (value != null) { Operators.onNextDroppedMulticast(value); } return; } this.value = value; Publisher<? extends O> parent = source; source = null; NextInner<O>[] array = SUBSCRIBERS.getAndSet(this, TERMINATED); if (value == null) { for (NextInner<O> as : array) { as.onComplete(); } } else { if (s != null && !(parent instanceof Mono)) { s.cancel(); } for (NextInner<O> as : array) { as.complete(value); } } waitStrategy.signalAllWhenBlocking(); } @Override public final void onSubscribe(Subscription subscription) { if (Operators.setOnce(UPSTREAM, this, subscription)) { subscription.request(Long.MAX_VALUE); } } @Override public Stream<? extends Scannable> inners() { return Stream.of(subscribers); }
Returns the value that completed this MonoProcessor. Returns null if the MonoProcessor has not been completed. If the MonoProcessor is completed with an error a RuntimeException that wraps the error is thrown.
Throws:
Returns:the value that completed the MonoProcessor, or null if it has not been completed
/** * Returns the value that completed this {@link MonoProcessor}. Returns {@code null} if the {@link MonoProcessor} has not been completed. If the * {@link MonoProcessor} is completed with an error a RuntimeException that wraps the error is thrown. * * @return the value that completed the {@link MonoProcessor}, or {@code null} if it has not been completed * * @throws RuntimeException if the {@link MonoProcessor} was completed with an error */
@Nullable public O peek() { if (!isTerminated()) { return null; } if (value != null) { return value; } if (error != null) { RuntimeException re = Exceptions.propagate(error); re = Exceptions.addSuppressed(re, new Exception("Mono#peek terminated with an error")); throw re; } return null; } @Override public final void request(long n) { Operators.validate(n); } @Override public void subscribe(final CoreSubscriber<? super O> actual) { NextInner<O> as = new NextInner<>(actual, this); actual.onSubscribe(as); if (add(as)) { if (as.isCancelled()) { remove(as); } } else { Throwable ex = error; if (ex != null) { actual.onError(ex); } else { O v = value; if (v != null) { as.complete(v); } else { as.onComplete(); } } } } void connect() { Publisher<? extends O> parent = source; if (parent != null && SUBSCRIBERS.compareAndSet(this, EMPTY_WITH_SOURCE, EMPTY)) { parent.subscribe(this); } } @Override public Context currentContext() { return Operators.multiSubscribersContext(subscribers); } @Override @Nullable public Object scanUnsafe(Attr key) { //touch guard boolean t = isTerminated(); if (key == Attr.TERMINATED) return t; if (key == Attr.PARENT) return subscription; if (key == Attr.ERROR) return error; if (key == Attr.PREFETCH) return Integer.MAX_VALUE; if (key == Attr.CANCELLED) return isCancelled(); return null; } final boolean isPending() { return !isTerminated(); }
Return the number of active Subscriber or -1 if untracked.
Returns:the number of active Subscriber or -1 if untracked
/** * Return the number of active {@link Subscriber} or {@literal -1} if untracked. * * @return the number of active {@link Subscriber} or {@literal -1} if untracked */
public final long downstreamCount() { return subscribers.length; }
Return true if any Subscriber is actively subscribed
Returns:true if any Subscriber is actively subscribed
/** * Return true if any {@link Subscriber} is actively subscribed * * @return true if any {@link Subscriber} is actively subscribed */
public final boolean hasDownstreams() { return downstreamCount() != 0; } boolean add(NextInner<O> ps) { for (;;) { NextInner<O>[] a = subscribers; if (a == TERMINATED) { return false; } int n = a.length; @SuppressWarnings("unchecked") NextInner<O>[] b = new NextInner[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = ps; if (SUBSCRIBERS.compareAndSet(this, a, b)) { Publisher<? extends O> parent = source; if (parent != null && a == EMPTY_WITH_SOURCE) { parent.subscribe(this); } return true; } } } @SuppressWarnings("unchecked") void remove(NextInner<O> ps) { for (;;) { NextInner<O>[] a = subscribers; int n = a.length; if (n == 0) { return; } int j = -1; for (int i = 0; i < n; i++) { if (a[i] == ps) { j = i; break; } } if (j < 0) { return; } NextInner<O>[] b; if (n == 1) { b = EMPTY; } else { b = new NextInner[n - 1]; System.arraycopy(a, 0, b, 0, j); System.arraycopy(a, j + 1, b, j, n - j - 1); } if (SUBSCRIBERS.compareAndSet(this, a, b)) { return; } } } final static class NextInner<T> extends Operators.MonoSubscriber<T, T> { final MonoProcessor<T> parent; NextInner(CoreSubscriber<? super T> actual, MonoProcessor<T> parent) { super(actual); this.parent = parent; } @Override public void cancel() { if (STATE.getAndSet(this, CANCELLED) != CANCELLED) { parent.remove(this); } } @Override public void onComplete() { if (!isCancelled()) { actual.onComplete(); } } @Override public void onError(Throwable t) { if (isCancelled()) { Operators.onOperatorError(t, currentContext()); } else { actual.onError(t); } } @Override public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) { return parent; } return super.scanUnsafe(key); } } }