/*
 * Copyright (c) 2011-Present VMware Inc. or its affiliates, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *        https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.stream.Stream;

import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

A MonoProcessor is a Processor that is also a Mono.

Implementations might implements stateful semantics, allowing multiple subscriptions. Once a MonoProcessor has been resolved, implementations may also replay cached signals to newer subscribers.

Despite having default implementations, most methods should be reimplemented with meaningful semantics relevant to concrete child classes.

Author:Stephane Maldini
Type parameters:
  • <O> – the type of the value that will be made available
Deprecated:Processors will be removed in 3.5. Prefer using One or Empty instead, or see https://github.com/reactor/reactor-core/issues/2431 for alternatives
/** * A {@code MonoProcessor} is a {@link Processor} that is also a {@link Mono}. * * <p> * <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/monoprocessor.png" alt=""> * * <p> * Implementations might implements stateful semantics, allowing multiple subscriptions. * Once a {@link MonoProcessor} has been resolved, implementations may also replay cached signals to newer subscribers. * <p> * Despite having default implementations, most methods should be reimplemented with meaningful semantics relevant to * concrete child classes. * * @param <O> the type of the value that will be made available * * @author Stephane Maldini * @deprecated Processors will be removed in 3.5. Prefer using {@link Sinks.One} or {@link Sinks.Empty} instead, * or see https://github.com/reactor/reactor-core/issues/2431 for alternatives */
@Deprecated public abstract class MonoProcessor<O> extends Mono<O> implements Processor<O, O>, CoreSubscriber<O>, Disposable, Subscription, Scannable {
Create a MonoProcessor that will eagerly request 1 on CoreSubscriber.onSubscribe(Subscription), cache and emit the eventual result for 1 or N subscribers.
Type parameters:
  • <T> – type of the expected value
Returns:A MonoProcessor.
Deprecated:Use Sinks.one(), to be removed in 3.5
/** * Create a {@link MonoProcessor} that will eagerly request 1 on {@link #onSubscribe(Subscription)}, cache and emit * the eventual result for 1 or N subscribers. * * @param <T> type of the expected value * * @return A {@link MonoProcessor}. * @deprecated Use {@link Sinks#one()}, to be removed in 3.5 */
@Deprecated public static <T> MonoProcessor<T> create() { return new NextProcessor<>(null); }
Deprecated:the MonoProcessor will cease to implement Subscription in 3.5
/** * @deprecated the {@link MonoProcessor} will cease to implement {@link Subscription} in 3.5 */
@Override @Deprecated public void cancel() { }
Indicates whether this MonoProcessor has been interrupted via cancellation.
Returns:true if this MonoProcessor is cancelled, false otherwise.
Deprecated:the MonoProcessor will cease to implement Subscription and this method will be removed in 3.5
/** * Indicates whether this {@code MonoProcessor} has been interrupted via cancellation. * * @return {@code true} if this {@code MonoProcessor} is cancelled, {@code false} * otherwise. * @deprecated the {@link MonoProcessor} will cease to implement {@link Subscription} and this method will be removed in 3.5 */
@Deprecated public boolean isCancelled() { return false; }
Params:
  • n – the request amount
Deprecated:the MonoProcessor will cease to implement Subscription in 3.5
/** * @param n the request amount * @deprecated the {@link MonoProcessor} will cease to implement {@link Subscription} in 3.5 */
@Override @Deprecated public void request(long n) { Operators.validate(n); } @Override public void dispose() { onError(new CancellationException("Disposed")); }
Block the calling thread indefinitely, waiting for the completion of this MonoProcessor. If the MonoProcessor is completed with an error a RuntimeException that wraps the error is thrown.
Returns:the value of this MonoProcessor
/** * Block the calling thread indefinitely, waiting for the completion of this {@code MonoProcessor}. If the * {@link MonoProcessor} is completed with an error a RuntimeException that wraps the error is thrown. * * @return the value of this {@code MonoProcessor} */
@Override @Nullable public O block() { return block(null); }
Block the calling thread for the specified time, waiting for the completion of this MonoProcessor. If the MonoProcessor is completed with an error a RuntimeException that wraps the error is thrown.
Params:
  • timeout – the timeout value as a Duration
Returns:the value of this MonoProcessor or null if the timeout is reached and the MonoProcessor has not completed
/** * Block the calling thread for the specified time, waiting for the completion of this {@code MonoProcessor}. If the * {@link MonoProcessor} is completed with an error a RuntimeException that wraps the error is thrown. * * @param timeout the timeout value as a {@link Duration} * * @return the value of this {@code MonoProcessor} or {@code null} if the timeout is reached and the {@code MonoProcessor} has * not completed */
@Override @Nullable public O block(@Nullable Duration timeout) { return peek(); }
Return the produced Throwable error if any or null
Returns:the produced Throwable error if any or null
/** * Return the produced {@link Throwable} error if any or null * * @return the produced {@link Throwable} error if any or null */
@Nullable public Throwable getError() { return null; }
Indicates whether this MonoProcessor has been completed with an error.
Returns:true if this MonoProcessor was completed with an error, false otherwise.
/** * Indicates whether this {@code MonoProcessor} has been completed with an error. * * @return {@code true} if this {@code MonoProcessor} was completed with an error, {@code false} otherwise. */
public final boolean isError() { return getError() != null; }
Indicates whether this MonoProcessor has been successfully completed a value.
Returns:true if this MonoProcessor is successful, false otherwise.
/** * Indicates whether this {@code MonoProcessor} has been successfully completed a value. * * @return {@code true} if this {@code MonoProcessor} is successful, {@code false} otherwise. */
public final boolean isSuccess() { return isTerminated() && !isError(); }
Indicates whether this MonoProcessor has been terminated by the source producer with a success or an error.
Returns:true if this MonoProcessor is successful, false otherwise.
/** * Indicates whether this {@code MonoProcessor} has been terminated by the * source producer with a success or an error. * * @return {@code true} if this {@code MonoProcessor} is successful, {@code false} otherwise. */
public boolean isTerminated() { return false; } @Override public boolean isDisposed() { return isTerminated() || isCancelled(); }
Returns the value that completed this MonoProcessor. Returns null if the MonoProcessor has not been completed. If the MonoProcessor is completed with an error a RuntimeException that wraps the error is thrown.
Throws:
Returns:the value that completed the MonoProcessor, or null if it has not been completed
Deprecated:this method is discouraged, consider peeking into a MonoProcessor by turning it into a CompletableFuture
/** * Returns the value that completed this {@link MonoProcessor}. Returns {@code null} if the {@link MonoProcessor} has not been completed. If the * {@link MonoProcessor} is completed with an error a RuntimeException that wraps the error is thrown. * * @return the value that completed the {@link MonoProcessor}, or {@code null} if it has not been completed * * @throws RuntimeException if the {@link MonoProcessor} was completed with an error * @deprecated this method is discouraged, consider peeking into a MonoProcessor by {@link Mono#toFuture() turning it into a CompletableFuture} */
@Nullable @Deprecated public O peek() { return null; } @Override public Context currentContext() { InnerProducer<?>[] innerProducersArray = inners().filter(InnerProducer.class::isInstance) .map(InnerProducer.class::cast) .toArray(InnerProducer[]::new); return Operators.multiSubscribersContext(innerProducersArray); } @Override @Nullable public Object scanUnsafe(Attr key) { //touch guard boolean t = isTerminated(); if (key == Attr.TERMINATED) return t; if (key == Attr.ERROR) return getError(); if (key == Attr.PREFETCH) return Integer.MAX_VALUE; if (key == Attr.CANCELLED) return isCancelled(); if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; return null; }
Return the number of active Subscriber or -1 if untracked.
Returns:the number of active Subscriber or -1 if untracked
/** * Return the number of active {@link Subscriber} or {@literal -1} if untracked. * * @return the number of active {@link Subscriber} or {@literal -1} if untracked */
public long downstreamCount() { return inners().count(); }
Return true if any Subscriber is actively subscribed
Returns:true if any Subscriber is actively subscribed
/** * Return true if any {@link Subscriber} is actively subscribed * * @return true if any {@link Subscriber} is actively subscribed */
public final boolean hasDownstreams() { return downstreamCount() != 0; } @Override public Stream<? extends Scannable> inners() { return Stream.empty(); } }