/*
 * Copyright (c) 2011-2017 Pivotal Software Inc, All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package reactor.core.publisher;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.util.annotation.Nullable;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

A Processor implementation that takes a custom queue and allows only a single subscriber. UnicastProcessor allows multiplexing of the events which means that it supports multiple producers and only one consumer. However, it should be noticed that multi-producer case is only valid if appropriate Queue is provided. Otherwise, it could break Reactive Streams Spec if Publishers publish on different threads.



Note: UnicastProcessor does not respect the actual subscriber's demand as it is described in Reactive Streams Spec. However, UnicastProcessor embraces configurable Queue internally which allows enabling backpressure support and preventing of consumer's overwhelming. Hence, interaction model between producers and UnicastProcessor will be PUSH only. In opposite, interaction model between UnicastProcessor and consumer will be PUSH-PULL as defined in Reactive Streams Spec. In the case when upstream's signals overflow the bound of internal Queue, UnicastProcessor will fail with signaling onError( OverflowException).



Note: The implementation keeps the order of signals. That means that in case of terminal signal (completion or error signals) it will be postponed until all of the previous signals has been consumed.

Type parameters:
  • <T> – the input and output type
/** * A Processor implementation that takes a custom queue and allows * only a single subscriber. UnicastProcessor allows multiplexing of the events which * means that it supports multiple producers and only one consumer. * However, it should be noticed that multi-producer case is only valid if appropriate * Queue * is provided. Otherwise, it could break * <a href="http://www.reactive-streams.org/">Reactive Streams Spec</a> if Publishers * publish on different threads. * * <p> * <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.2.0.M2/src/docs/marble/unicastprocessornormal.png" alt=""> * </p> * * </br> * </br> * * <p> * <b>Note: </b> UnicastProcessor does not respect the actual subscriber's * demand as it is described in * <a href="http://www.reactive-streams.org/">Reactive Streams Spec</a>. However, * UnicastProcessor embraces configurable Queue internally which allows enabling * backpressure support and preventing of consumer's overwhelming. * * Hence, interaction model between producers and UnicastProcessor will be PUSH * only. In opposite, interaction model between UnicastProcessor and consumer will be * PUSH-PULL as defined in * <a href="http://www.reactive-streams.org/">Reactive Streams Spec</a>. * * In the case when upstream's signals overflow the bound of internal Queue, * UnicastProcessor will fail with signaling onError( * {@link reactor.core.Exceptions.OverflowException}). * * <p> * <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.2.0.M2/src/docs/marble/unicastprocessoroverflow.png" alt=""> * </p> * </p> * * </br> * </br> * * <p> * <b>Note: </b> The implementation keeps the order of signals. That means that in * case of terminal signal (completion or error signals) it will be postponed * until all of the previous signals has been consumed. * <p> * <img width="640" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.2.0.M2/src/docs/marble/unicastprocessorterminal.png" alt=""> * </p> * </p> * * @param <T> the input and output type */
public final class UnicastProcessor<T> extends FluxProcessor<T, T> implements Fuseable.QueueSubscription<T>, Fuseable, InnerOperator<T, T> {
Create a new UnicastProcessor that will buffer on an internal queue in an unbounded fashion.
Type parameters:
  • <E> – the relayed type
Returns:a unicast FluxProcessor
/** * Create a new {@link UnicastProcessor} that will buffer on an internal queue in an * unbounded fashion. * * @param <E> the relayed type * @return a unicast {@link FluxProcessor} */
public static <E> UnicastProcessor<E> create() { return new UnicastProcessor<>(Queues.<E>unbounded().get()); }
Create a new UnicastProcessor that will buffer on a provided queue in an unbounded fashion.
Params:
  • queue – the buffering queue
Type parameters:
  • <E> – the relayed type
Returns:a unicast FluxProcessor
/** * Create a new {@link UnicastProcessor} that will buffer on a provided queue in an * unbounded fashion. * * @param queue the buffering queue * @param <E> the relayed type * @return a unicast {@link FluxProcessor} */
public static <E> UnicastProcessor<E> create(Queue<E> queue) { return new UnicastProcessor<>(queue); }
Create a new UnicastProcessor that will buffer on a provided queue in an unbounded fashion.
Params:
  • queue – the buffering queue
  • endcallback – called on any terminal signal
Type parameters:
  • <E> – the relayed type
Returns:a unicast FluxProcessor
/** * Create a new {@link UnicastProcessor} that will buffer on a provided queue in an * unbounded fashion. * * @param queue the buffering queue * @param endcallback called on any terminal signal * @param <E> the relayed type * @return a unicast {@link FluxProcessor} */
public static <E> UnicastProcessor<E> create(Queue<E> queue, Disposable endcallback) { return new UnicastProcessor<>(queue, endcallback); }
Create a new UnicastProcessor that will buffer on a provided queue in an unbounded fashion.
Params:
  • queue – the buffering queue
  • endcallback – called on any terminal signal
  • onOverflow – called when queue.offer return false and unicastProcessor is about to emit onError.
Type parameters:
  • <E> – the relayed type
Returns:a unicast FluxProcessor
/** * Create a new {@link UnicastProcessor} that will buffer on a provided queue in an * unbounded fashion. * * @param queue the buffering queue * @param endcallback called on any terminal signal * @param onOverflow called when queue.offer return false and unicastProcessor is * about to emit onError. * @param <E> the relayed type * * @return a unicast {@link FluxProcessor} */
public static <E> UnicastProcessor<E> create(Queue<E> queue, Consumer<? super E> onOverflow, Disposable endcallback) { return new UnicastProcessor<>(queue, onOverflow, endcallback); } final Queue<T> queue; final Consumer<? super T> onOverflow; volatile Disposable onTerminate; @SuppressWarnings("rawtypes") static final AtomicReferenceFieldUpdater<UnicastProcessor, Disposable> ON_TERMINATE = AtomicReferenceFieldUpdater.newUpdater(UnicastProcessor.class, Disposable.class, "onTerminate"); volatile boolean done; Throwable error; volatile CoreSubscriber<? super T> actual; volatile boolean cancelled; volatile int once; @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater<UnicastProcessor> ONCE = AtomicIntegerFieldUpdater.newUpdater(UnicastProcessor.class, "once"); volatile int wip; @SuppressWarnings("rawtypes") static final AtomicIntegerFieldUpdater<UnicastProcessor> WIP = AtomicIntegerFieldUpdater.newUpdater(UnicastProcessor.class, "wip"); volatile long requested; @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater<UnicastProcessor> REQUESTED = AtomicLongFieldUpdater.newUpdater(UnicastProcessor.class, "requested"); volatile boolean outputFused; public UnicastProcessor(Queue<T> queue) { this.queue = Objects.requireNonNull(queue, "queue"); this.onTerminate = null; this.onOverflow = null; } public UnicastProcessor(Queue<T> queue, Disposable onTerminate) { this.queue = Objects.requireNonNull(queue, "queue"); this.onTerminate = Objects.requireNonNull(onTerminate, "onTerminate"); this.onOverflow = null; } public UnicastProcessor(Queue<T> queue, Consumer<? super T> onOverflow, Disposable onTerminate) { this.queue = Objects.requireNonNull(queue, "queue"); this.onOverflow = Objects.requireNonNull(onOverflow, "onOverflow"); this.onTerminate = Objects.requireNonNull(onTerminate, "onTerminate"); } @Override public int getBufferSize() { return Queues.capacity(this.queue); } @Override public Object scanUnsafe(Attr key) { if (Attr.BUFFERED == key) return queue.size(); return super.scanUnsafe(key); } void doTerminate() { Disposable r = onTerminate; if (r != null && ON_TERMINATE.compareAndSet(this, r, null)) { r.dispose(); } } void drainRegular(Subscriber<? super T> a) { int missed = 1; final Queue<T> q = queue; for (;;) { long r = requested; long e = 0L; while (r != e) { boolean d = done; T t = q.poll(); boolean empty = t == null; if (checkTerminated(d, empty, a, q)) { return; } if (empty) { break; } a.onNext(t); e++; } if (r == e) { if (checkTerminated(done, q.isEmpty(), a, q)) { return; } } if (e != 0 && r != Long.MAX_VALUE) { REQUESTED.addAndGet(this, -e); } missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } } void drainFused(Subscriber<? super T> a) { int missed = 1; final Queue<T> q = queue; for (;;) { if (cancelled) { q.clear(); actual = null; return; } boolean d = done; a.onNext(null); if (d) { actual = null; Throwable ex = error; if (ex != null) { a.onError(ex); } else { a.onComplete(); } return; } missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } } void drain() { if (WIP.getAndIncrement(this) != 0) { return; } int missed = 1; for (;;) { Subscriber<? super T> a = actual; if (a != null) { if (outputFused) { drainFused(a); } else { drainRegular(a); } return; } missed = WIP.addAndGet(this, -missed); if (missed == 0) { break; } } } boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, Queue<T> q) { if (cancelled) { q.clear(); actual = null; return true; } if (d && empty) { Throwable e = error; actual = null; if (e != null) { a.onError(e); } else { a.onComplete(); } return true; } return false; } @Override public void onSubscribe(Subscription s) { if (done || cancelled) { s.cancel(); } else { s.request(Long.MAX_VALUE); } } @Override public int getPrefetch() { return Integer.MAX_VALUE; } @Override public Context currentContext() { CoreSubscriber<? super T> actual = this.actual; return actual != null ? actual.currentContext() : Context.empty(); } @Override public void onNext(T t) { if (done || cancelled) { Operators.onNextDropped(t, currentContext()); return; } if (!queue.offer(t)) { Throwable ex = Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext()); if(onOverflow != null) { try { onOverflow.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); ex.initCause(e); } } onError(Operators.onOperatorError(null, ex, t, currentContext())); return; } drain(); } @Override public void onError(Throwable t) { if (done || cancelled) { Operators.onErrorDropped(t, currentContext()); return; } error = t; done = true; doTerminate(); drain(); } @Override public void onComplete() { if (done || cancelled) { return; } done = true; doTerminate(); drain(); } @Override public void subscribe(CoreSubscriber<? super T> actual) { Objects.requireNonNull(actual, "subscribe"); if (once == 0 && ONCE.compareAndSet(this, 0, 1)) { actual.onSubscribe(this); this.actual = actual; if (cancelled) { this.actual = null; } else { drain(); } } else { Operators.error(actual, new IllegalStateException("UnicastProcessor " + "allows only a single Subscriber")); } } @Override public void request(long n) { if (Operators.validate(n)) { Operators.addCap(REQUESTED, this, n); drain(); } } @Override public void cancel() { if (cancelled) { return; } cancelled = true; doTerminate(); if (!outputFused) { if (WIP.getAndIncrement(this) == 0) { queue.clear(); actual = null; } } } @Override @Nullable public T poll() { return queue.poll(); } @Override public int size() { return queue.size(); } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public void clear() { queue.clear(); } @Override public int requestFusion(int requestedMode) { if ((requestedMode & Fuseable.ASYNC) != 0) { outputFused = true; return Fuseable.ASYNC; } return Fuseable.NONE; } @Override public boolean isDisposed() { return cancelled || done; } @Override public boolean isTerminated() { return done; } @Override @Nullable public Throwable getError() { return error; } @Override public CoreSubscriber<? super T> actual() { return actual; } @Override public long downstreamCount() { return hasDownstreams() ? 1L : 0L; } @Override public boolean hasDownstreams() { return actual != null; } }