/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package reactor.core.publisher;

import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.stream.Stream;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;

A base processor that exposes Flux API for Processor. Implementors include UnicastProcessor, EmitterProcessor, ReplayProcessor, WorkQueueProcessor and TopicProcessor.
Author:Stephane Maldini
Type parameters:
  • <IN> – the input value type
  • <OUT> – the output value type
/** * A base processor that exposes {@link Flux} API for {@link Processor}. * * Implementors include {@link UnicastProcessor}, {@link EmitterProcessor}, * {@link ReplayProcessor}, {@link WorkQueueProcessor} and {@link TopicProcessor}. * * @author Stephane Maldini * * @param <IN> the input value type * @param <OUT> the output value type */
public abstract class FluxProcessor<IN, OUT> extends Flux<OUT> implements Processor<IN, OUT>, CoreSubscriber<IN>, Scannable, Disposable {
Build a FluxProcessor whose data are emitted by the most recent emitted Publisher. The Flux will complete once both the publishers source and the last switched to Publisher have completed.

Type parameters:
  • <T> – the produced type
Returns:a FluxProcessor accepting publishers and producing T
/** * Build a {@link FluxProcessor} whose data are emitted by the most recent emitted {@link Publisher}. * The {@link Flux} will complete once both the publishers source and the last switched to {@link Publisher} have * completed. * * <p> * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/switchonnext.png" alt=""> * * @param <T> the produced type * @return a {@link FluxProcessor} accepting publishers and producing T */
public static <T> FluxProcessor<Publisher<? extends T>, T> switchOnNext() { UnicastProcessor<Publisher<? extends T>> emitter = UnicastProcessor.create(); FluxProcessor<Publisher<? extends T>, T> p = FluxProcessor.wrap(emitter, switchOnNext(emitter)); return p; }
Transform a receiving Subscriber and a producing Publisher in a logical FluxProcessor. The link between the passed upstream and returned downstream will not be created automatically, e.g. not subscribed together. A Processor might choose to have orthogonal sequence input and output.
Params:
  • upstream – the upstream subscriber
  • downstream – the downstream publisher
Type parameters:
  • <IN> – the receiving type
  • <OUT> – the producing type
Returns:a new blackboxed FluxProcessor
/** * Transform a receiving {@link Subscriber} and a producing {@link Publisher} in a logical {@link FluxProcessor}. * The link between the passed upstream and returned downstream will not be created automatically, e.g. not * subscribed together. A {@link Processor} might choose to have orthogonal sequence input and output. * * @param <IN> the receiving type * @param <OUT> the producing type * * @param upstream the upstream subscriber * @param downstream the downstream publisher * @return a new blackboxed {@link FluxProcessor} */
public static <IN, OUT> FluxProcessor<IN, OUT> wrap(final Subscriber<IN> upstream, final Publisher<OUT> downstream) { return new DelegateProcessor<>(downstream, upstream); } @Override public void dispose() { onError(new CancellationException("Disposed")); }
Return the number of active Subscriber or -1 if untracked.
Returns:the number of active Subscriber or -1 if untracked
/** * Return the number of active {@link Subscriber} or {@literal -1} if untracked. * * @return the number of active {@link Subscriber} or {@literal -1} if untracked */
public long downstreamCount(){ return inners().count(); }
Return the processor buffer capacity if any or Integer.MAX_VALUE
Returns:processor buffer capacity if any or Integer.MAX_VALUE
/** * Return the processor buffer capacity if any or {@link Integer#MAX_VALUE} * * @return processor buffer capacity if any or {@link Integer#MAX_VALUE} */
public int getBufferSize() { return Integer.MAX_VALUE; }
Current error if any, default to null
Returns:Current error if any, default to null
/** * Current error if any, default to null * * @return Current error if any, default to null */
@Nullable public Throwable getError() { return null; }
Return true if any Subscriber is actively subscribed
Returns:true if any Subscriber is actively subscribed
/** * Return true if any {@link Subscriber} is actively subscribed * * @return true if any {@link Subscriber} is actively subscribed */
public boolean hasDownstreams() { return downstreamCount() != 0L; }
Return true if terminated with onComplete
Returns:true if terminated with onComplete
/** * Return true if terminated with onComplete * * @return true if terminated with onComplete */
public final boolean hasCompleted() { return isTerminated() && getError() == null; }
Return true if terminated with onError
Returns:true if terminated with onError
/** * Return true if terminated with onError * * @return true if terminated with onError */
public final boolean hasError() { return isTerminated() && getError() != null; } @Override public Stream<? extends Scannable> inners() { return Stream.empty(); }
Has this upstream finished or "completed" / "failed" ?
Returns:has this upstream finished or "completed" / "failed" ?
/** * Has this upstream finished or "completed" / "failed" ? * * @return has this upstream finished or "completed" / "failed" ? */
public boolean isTerminated() { return false; }
Return true if this FluxProcessor supports multithread producing
Returns:true if this FluxProcessor supports multithread producing
/** * Return true if this {@link FluxProcessor} supports multithread producing * * @return true if this {@link FluxProcessor} supports multithread producing */
public boolean isSerialized() { return false; } @Override @Nullable public Object scanUnsafe(Attr key) { if (key == Attr.TERMINATED) return isTerminated(); if (key == Attr.ERROR) return getError(); if (key == Attr.CAPACITY) return getBufferSize(); return null; }
Create a FluxProcessor that safely gates multi-threaded producer Subscriber.onNext(Object).
Returns:a serializing FluxProcessor
/** * Create a {@link FluxProcessor} that safely gates multi-threaded producer * {@link Subscriber#onNext(Object)}. * * @return a serializing {@link FluxProcessor} */
public final FluxProcessor<IN, OUT> serialize() { return new DelegateProcessor<>(this, Operators.serialize(this)); }
Create a FluxSink that safely gates multi-threaded producer Subscriber.onNext(Object). This processor will be subscribed to that FluxSink, and any previous subscribers will be unsubscribed.

The returned FluxSink will not apply any OverflowStrategy and overflowing FluxSink.next(Object) will behave in two possible ways depending on the Processor:

  • an unbounded processor will handle the overflow itself by dropping or buffering
  • a bounded processor will block/spin
Returns:a serializing FluxSink
/** * Create a {@link FluxSink} that safely gates multi-threaded producer * {@link Subscriber#onNext(Object)}. This processor will be subscribed to * that {@link FluxSink}, and any previous subscribers will be unsubscribed. * * <p> The returned {@link FluxSink} will not apply any * {@link FluxSink.OverflowStrategy} and overflowing {@link FluxSink#next(Object)} * will behave in two possible ways depending on the Processor: * <ul> * <li> an unbounded processor will handle the overflow itself by dropping or * buffering </li> * <li> a bounded processor will block/spin</li> * </ul> * * @return a serializing {@link FluxSink} */
public final FluxSink<IN> sink() { return sink(FluxSink.OverflowStrategy.IGNORE); }
Create a FluxSink that safely gates multi-threaded producer Subscriber.onNext(Object). This processor will be subscribed to that FluxSink, and any previous subscribers will be unsubscribed.

The returned FluxSink will not apply any OverflowStrategy and overflowing FluxSink.next(Object) will behave in two possible ways depending on the Processor:

  • an unbounded processor will handle the overflow itself by dropping or buffering
  • a bounded processor will block/spin on IGNORE strategy, or apply the strategy behavior
Params:
  • strategy – the overflow strategy, see OverflowStrategy for the available strategies
Returns:a serializing FluxSink
/** * Create a {@link FluxSink} that safely gates multi-threaded producer * {@link Subscriber#onNext(Object)}. This processor will be subscribed to * that {@link FluxSink}, and any previous subscribers will be unsubscribed. * * <p> The returned {@link FluxSink} will not apply any * {@link FluxSink.OverflowStrategy} and overflowing {@link FluxSink#next(Object)} * will behave in two possible ways depending on the Processor: * <ul> * <li> an unbounded processor will handle the overflow itself by dropping or * buffering </li> * <li> a bounded processor will block/spin on IGNORE strategy, or apply the * strategy behavior</li> * </ul> * * @param strategy the overflow strategy, see {@link FluxSink.OverflowStrategy} * for the * available strategies * @return a serializing {@link FluxSink} */
public final FluxSink<IN> sink(FluxSink.OverflowStrategy strategy) { Objects.requireNonNull(strategy, "strategy"); if (getBufferSize() == Integer.MAX_VALUE){ strategy = FluxSink.OverflowStrategy.IGNORE; } FluxCreate.BaseSink<IN> s = FluxCreate.createSink(this, strategy); onSubscribe(s); if(s.isCancelled() || (isSerialized() && getBufferSize() == Integer.MAX_VALUE)){ return s; } if (serializeAlways()) return new FluxCreate.SerializedSink<>(s); else return new FluxCreate.SerializeOnRequestSink<>(s); }
Returns serialization strategy. If true, sink() will always be serialized. Otherwise sink is serialized only if FluxSink.onRequest(LongConsumer) is invoked.
Returns:true to serialize any sink, false to delay serialization till onRequest
/** * Returns serialization strategy. If true, {@link FluxProcessor#sink()} will always * be serialized. Otherwise sink is serialized only if {@link FluxSink#onRequest(java.util.function.LongConsumer)} * is invoked. * @return true to serialize any sink, false to delay serialization till onRequest */
protected boolean serializeAlways() { return true; } }