Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.processors; import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.annotations.*; import io.reactivex.exceptions.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.*; import io.reactivex.internal.subscriptions.*; import io.reactivex.plugins.RxJavaPlugins;
A FlowableProcessor implementation that coordinates downstream requests through a front-buffer and stable-prefetching, optionally canceling the upstream if all subscribers have cancelled.

This processor does not have a public constructor by design; a new empty instance of this MulticastProcessor can be created via the following create methods that allow configuring it:

  • create(): create an empty MulticastProcessor with Flowable.bufferSize() prefetch amount and no reference counting behavior.
  • create(int): create an empty MulticastProcessor with the given prefetch amount and no reference counting behavior.
  • create(boolean): create an empty MulticastProcessor with Flowable.bufferSize() prefetch amount and an optional reference counting behavior.
  • create(int, boolean): create an empty MulticastProcessor with the given prefetch amount and an optional reference counting behavior.

When the reference counting behavior is enabled, the MulticastProcessor cancels its upstream when all Subscribers have cancelled. Late Subscribers will then be immediately completed.

Because MulticastProcessor implements the Subscriber interface, calling onSubscribe is mandatory (Rule 2.12). If MulticastProcessor should run standalone, i.e., without subscribing the MulticastProcessor to another Publisher, use start() or startUnbounded() methods to initialize the internal buffer. Failing to do so will lead to a NullPointerException at runtime.

Use offer(Object) to try and offer/emit items but don't fail if the internal buffer is full.

A MulticastProcessor is a Processor type in the Reactive Streams specification, nulls are not allowed (Rule 2.13) as parameters to onSubscribe(Subscription), offer(Object), onNext(Object) and onError(Throwable). Such calls will result in a NullPointerException being thrown and the processor's state is not changed.

Since a MulticastProcessor is a Flowable, it supports backpressure. The backpressure from the currently subscribed Subscribers are coordinated by emitting upstream items only if all of those Subscribers have requested at least one item. This behavior is also called lockstep-mode because even if some Subscribers can take any number of items, other Subscribers requesting less or infrequently will slow down the overall throughput of the flow.

Calling onNext(Object), offer(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber consuming this processor also wants to call onNext(Object) on this processor recursively).

This MulticastProcessor supports the standard state-peeking methods hasComplete(), hasThrowable(), getThrowable() and hasSubscribers(). This processor doesn't allow peeking into its buffer.

When this MulticastProcessor is terminated via onError(Throwable) or onComplete(), all previously signaled but not yet consumed items will be still available to Subscribers and the respective terminal even is only emitted when all previous items have been successfully delivered to Subscribers. If there are no Subscribers, the remaining items will be buffered indefinitely.

The MulticastProcessor does not support clearing its cached events (to appear empty again).

Backpressure:
The backpressure from the currently subscribed Subscribers are coordinated by emitting upstream items only if all of those Subscribers have requested at least one item. This behavior is also called lockstep-mode because even if some Subscribers can take any number of items, other Subscribers requesting less or infrequently will slow down the overall throughput of the flow.
Scheduler:
MulticastProcessor does not operate by default on a particular Scheduler and the Subscribers get notified on an arbitrary thread in a serialized fashion.

Example:


MulticastProcessor<Integer> mp = Flowable.range(1, 10)
.subscribeWith(MulticastProcessor.create());
mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// --------------------
MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
mp2.start();
assertTrue(mp2.offer(1));
assertTrue(mp2.offer(2));
assertTrue(mp2.offer(3));
assertTrue(mp2.offer(4));
assertFalse(mp2.offer(5));
mp2.onComplete();
mp2.test().assertResult(1, 2, 3, 4);

History: 2.1.14 - experimental

Type parameters:
  • <T> – the input and output value type
Since:2.2
/** * A {@link FlowableProcessor} implementation that coordinates downstream requests through * a front-buffer and stable-prefetching, optionally canceling the upstream if all * subscribers have cancelled. * <p> * <img width="640" height="360" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/MulticastProcessor.png" alt=""> * <p> * This processor does not have a public constructor by design; a new empty instance of this * {@code MulticastProcessor} can be created via the following {@code create} methods that * allow configuring it: * <ul> * <li>{@link #create()}: create an empty {@code MulticastProcessor} with * {@link io.reactivex.Flowable#bufferSize() Flowable.bufferSize()} prefetch amount * and no reference counting behavior.</li> * <li>{@link #create(int)}: create an empty {@code MulticastProcessor} with * the given prefetch amount and no reference counting behavior.</li> * <li>{@link #create(boolean)}: create an empty {@code MulticastProcessor} with * {@link io.reactivex.Flowable#bufferSize() Flowable.bufferSize()} prefetch amount * and an optional reference counting behavior.</li> * <li>{@link #create(int, boolean)}: create an empty {@code MulticastProcessor} with * the given prefetch amount and an optional reference counting behavior.</li> * </ul> * <p> * When the reference counting behavior is enabled, the {@code MulticastProcessor} cancels its * upstream when all {@link Subscriber}s have cancelled. Late {@code Subscriber}s will then be * immediately completed. * <p> * Because {@code MulticastProcessor} implements the {@link Subscriber} interface, calling * {@code onSubscribe} is mandatory (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>). * If {@code MulticastProcessor} should run standalone, i.e., without subscribing the {@code MulticastProcessor} to another {@link Publisher}, * use {@link #start()} or {@link #startUnbounded()} methods to initialize the internal buffer. * Failing to do so will lead to a {@link NullPointerException} at runtime. * <p> * Use {@link #offer(Object)} to try and offer/emit items but don't fail if the * internal buffer is full. * <p> * A {@code MulticastProcessor} is a {@link Processor} type in the Reactive Streams specification, * {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as * parameters to {@link #onSubscribe(Subscription)}, {@link #offer(Object)}, {@link #onNext(Object)} and {@link #onError(Throwable)}. * Such calls will result in a {@link NullPointerException} being thrown and the processor's state is not changed. * <p> * Since a {@code MulticastProcessor} is a {@link io.reactivex.Flowable}, it supports backpressure. * The backpressure from the currently subscribed {@link Subscriber}s are coordinated by emitting upstream * items only if all of those {@code Subscriber}s have requested at least one item. This behavior * is also called <em>lockstep-mode</em> because even if some {@code Subscriber}s can take any number * of items, other {@code Subscriber}s requesting less or infrequently will slow down the overall * throughput of the flow. * <p> * Calling {@link #onNext(Object)}, {@link #offer(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} * is required to be serialized (called from the same thread or called non-overlappingly from different threads * through external means of serialization). The {@link #toSerialized()} method available to all {@link FlowableProcessor}s * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber} * consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively). * <p> * This {@code MulticastProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, * {@link #getThrowable()} and {@link #hasSubscribers()}. This processor doesn't allow peeking into its buffer. * <p> * When this {@code MulticastProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()}, * all previously signaled but not yet consumed items will be still available to {@code Subscriber}s and the respective * terminal even is only emitted when all previous items have been successfully delivered to {@code Subscriber}s. * If there are no {@code Subscriber}s, the remaining items will be buffered indefinitely. * <p> * The {@code MulticastProcessor} does not support clearing its cached events (to appear empty again). * <dl> * <dt><b>Backpressure:</b></dt> * <dd>The backpressure from the currently subscribed {@code Subscriber}s are coordinated by emitting upstream * items only if all of those {@code Subscriber}s have requested at least one item. This behavior * is also called <em>lockstep-mode</em> because even if some {@code Subscriber}s can take any number * of items, other {@code Subscriber}s requesting less or infrequently will slow down the overall * throughput of the flow.</dd> * <dt><b>Scheduler:</b></dt> * <dd>{@code MulticastProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and * the {@code Subscriber}s get notified on an arbitrary thread in a serialized fashion.</dd> * </dl> * <p> * Example: * <pre><code> MulticastProcessor&lt;Integer&gt; mp = Flowable.range(1, 10) .subscribeWith(MulticastProcessor.create()); mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // -------------------- MulticastProcessor&lt;Integer&gt; mp2 = MulticastProcessor.create(4); mp2.start(); assertTrue(mp2.offer(1)); assertTrue(mp2.offer(2)); assertTrue(mp2.offer(3)); assertTrue(mp2.offer(4)); assertFalse(mp2.offer(5)); mp2.onComplete(); mp2.test().assertResult(1, 2, 3, 4); * </code></pre> * <p>History: 2.1.14 - experimental * @param <T> the input and output value type * @since 2.2 */
@BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final class MulticastProcessor<T> extends FlowableProcessor<T> { final AtomicInteger wip; final AtomicReference<Subscription> upstream; final AtomicReference<MulticastSubscription<T>[]> subscribers; final AtomicBoolean once; final int bufferSize; final int limit; final boolean refcount; volatile SimpleQueue<T> queue; volatile boolean done; volatile Throwable error; int consumed; int fusionMode; @SuppressWarnings("rawtypes") static final MulticastSubscription[] EMPTY = new MulticastSubscription[0]; @SuppressWarnings("rawtypes") static final MulticastSubscription[] TERMINATED = new MulticastSubscription[0];
Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and no refCount-behavior.
Type parameters:
  • <T> – the input and output value type
Returns:the new MulticastProcessor instance
/** * Constructs a fresh instance with the default Flowable.bufferSize() prefetch * amount and no refCount-behavior. * @param <T> the input and output value type * @return the new MulticastProcessor instance */
@CheckReturnValue @NonNull public static <T> MulticastProcessor<T> create() { return new MulticastProcessor<T>(bufferSize(), false); }
Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and the optional refCount-behavior.
Params:
  • refCount – if true and if all Subscribers have canceled, the upstream is cancelled
Type parameters:
  • <T> – the input and output value type
Returns:the new MulticastProcessor instance
/** * Constructs a fresh instance with the default Flowable.bufferSize() prefetch * amount and the optional refCount-behavior. * @param <T> the input and output value type * @param refCount if true and if all Subscribers have canceled, the upstream * is cancelled * @return the new MulticastProcessor instance */
@CheckReturnValue @NonNull public static <T> MulticastProcessor<T> create(boolean refCount) { return new MulticastProcessor<T>(bufferSize(), refCount); }
Constructs a fresh instance with the given prefetch amount and no refCount behavior.
Params:
  • bufferSize – the prefetch amount
Type parameters:
  • <T> – the input and output value type
Returns:the new MulticastProcessor instance
/** * Constructs a fresh instance with the given prefetch amount and no refCount behavior. * @param bufferSize the prefetch amount * @param <T> the input and output value type * @return the new MulticastProcessor instance */
@CheckReturnValue @NonNull public static <T> MulticastProcessor<T> create(int bufferSize) { return new MulticastProcessor<T>(bufferSize, false); }
Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.
Params:
  • bufferSize – the prefetch amount
  • refCount – if true and if all Subscribers have canceled, the upstream is cancelled
Type parameters:
  • <T> – the input and output value type
Returns:the new MulticastProcessor instance
/** * Constructs a fresh instance with the given prefetch amount and the optional * refCount-behavior. * @param bufferSize the prefetch amount * @param refCount if true and if all Subscribers have canceled, the upstream * is cancelled * @param <T> the input and output value type * @return the new MulticastProcessor instance */
@CheckReturnValue @NonNull public static <T> MulticastProcessor<T> create(int bufferSize, boolean refCount) { return new MulticastProcessor<T>(bufferSize, refCount); }
Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.
Params:
  • bufferSize – the prefetch amount
  • refCount – if true and if all Subscribers have canceled, the upstream is cancelled
/** * Constructs a fresh instance with the given prefetch amount and the optional * refCount-behavior. * @param bufferSize the prefetch amount * @param refCount if true and if all Subscribers have canceled, the upstream * is cancelled */
@SuppressWarnings("unchecked") MulticastProcessor(int bufferSize, boolean refCount) { ObjectHelper.verifyPositive(bufferSize, "bufferSize"); this.bufferSize = bufferSize; this.limit = bufferSize - (bufferSize >> 2); this.wip = new AtomicInteger(); this.subscribers = new AtomicReference<MulticastSubscription<T>[]>(EMPTY); this.upstream = new AtomicReference<Subscription>(); this.refcount = refCount; this.once = new AtomicBoolean(); }
Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses a fixed buffer and allows using the onXXX and offer methods afterwards.
/** * Initializes this Processor by setting an upstream Subscription that * ignores request amounts, uses a fixed buffer * and allows using the onXXX and offer methods * afterwards. */
public void start() { if (SubscriptionHelper.setOnce(upstream, EmptySubscription.INSTANCE)) { queue = new SpscArrayQueue<T>(bufferSize); } }
Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses an unbounded buffer and allows using the onXXX and offer methods afterwards.
/** * Initializes this Processor by setting an upstream Subscription that * ignores request amounts, uses an unbounded buffer * and allows using the onXXX and offer methods * afterwards. */
public void startUnbounded() { if (SubscriptionHelper.setOnce(upstream, EmptySubscription.INSTANCE)) { queue = new SpscLinkedArrayQueue<T>(bufferSize); } } @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(upstream, s)) { if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") QueueSubscription<T> qs = (QueueSubscription<T>)s; int m = qs.requestFusion(QueueSubscription.ANY); if (m == QueueSubscription.SYNC) { fusionMode = m; queue = qs; done = true; drain(); return; } if (m == QueueSubscription.ASYNC) { fusionMode = m; queue = qs; s.request(bufferSize); return; } } queue = new SpscArrayQueue<T>(bufferSize); s.request(bufferSize); } } @Override public void onNext(T t) { if (once.get()) { return; } if (fusionMode == QueueSubscription.NONE) { ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); if (!queue.offer(t)) { SubscriptionHelper.cancel(upstream); onError(new MissingBackpressureException()); return; } } drain(); }
Tries to offer an item into the internal queue and returns false if the queue is full.
Params:
  • t – the item to offer, not null
Returns:true if successful, false if the queue is full
/** * Tries to offer an item into the internal queue and returns false * if the queue is full. * @param t the item to offer, not null * @return true if successful, false if the queue is full */
public boolean offer(T t) { if (once.get()) { return false; } ObjectHelper.requireNonNull(t, "offer called with null. Null values are generally not allowed in 2.x operators and sources."); if (fusionMode == QueueSubscription.NONE) { if (queue.offer(t)) { drain(); return true; } } return false; } @Override public void onError(Throwable t) { ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources."); if (once.compareAndSet(false, true)) { error = t; done = true; drain(); } else { RxJavaPlugins.onError(t); } } @Override public void onComplete() { if (once.compareAndSet(false, true)) { done = true; drain(); } } @Override public boolean hasSubscribers() { return subscribers.get().length != 0; } @Override public boolean hasThrowable() { return once.get() && error != null; } @Override public boolean hasComplete() { return once.get() && error == null; } @Override public Throwable getThrowable() { return once.get() ? error : null; } @Override protected void subscribeActual(Subscriber<? super T> s) { MulticastSubscription<T> ms = new MulticastSubscription<T>(s, this); s.onSubscribe(ms); if (add(ms)) { if (ms.get() == Long.MIN_VALUE) { remove(ms); } else { drain(); } } else { if (once.get() || !refcount) { Throwable ex = error; if (ex != null) { s.onError(ex); return; } } s.onComplete(); } } boolean add(MulticastSubscription<T> inner) { for (;;) { MulticastSubscription<T>[] a = subscribers.get(); if (a == TERMINATED) { return false; } int n = a.length; @SuppressWarnings("unchecked") MulticastSubscription<T>[] b = new MulticastSubscription[n + 1]; System.arraycopy(a, 0, b, 0, n); b[n] = inner; if (subscribers.compareAndSet(a, b)) { return true; } } } @SuppressWarnings("unchecked") void remove(MulticastSubscription<T> inner) { for (;;) { MulticastSubscription<T>[] a = subscribers.get(); int n = a.length; if (n == 0) { return; } int j = -1; for (int i = 0; i < n; i++) { if (a[i] == inner) { j = i; break; } } if (j < 0) { break; } if (n == 1) { if (refcount) { if (subscribers.compareAndSet(a, TERMINATED)) { SubscriptionHelper.cancel(upstream); once.set(true); break; } } else { if (subscribers.compareAndSet(a, EMPTY)) { break; } } } else { MulticastSubscription<T>[] b = new MulticastSubscription[n - 1]; System.arraycopy(a, 0, b, 0, j); System.arraycopy(a, j + 1, b, j, n - j - 1); if (subscribers.compareAndSet(a, b)) { break; } } } } @SuppressWarnings("unchecked") void drain() { if (wip.getAndIncrement() != 0) { return; } int missed = 1; AtomicReference<MulticastSubscription<T>[]> subs = subscribers; int c = consumed; int lim = limit; int fm = fusionMode; outer: for (;;) { SimpleQueue<T> q = queue; if (q != null) { MulticastSubscription<T>[] as = subs.get(); int n = as.length; if (n != 0) { long r = -1L; for (MulticastSubscription<T> a : as) { long ra = a.get(); if (ra >= 0L) { if (r == -1L) { r = ra - a.emitted; } else { r = Math.min(r, ra - a.emitted); } } } while (r > 0L) { MulticastSubscription<T>[] bs = subs.get(); if (bs == TERMINATED) { q.clear(); return; } if (as != bs) { continue outer; } boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); SubscriptionHelper.cancel(upstream); d = true; v = null; error = ex; done = true; } boolean empty = v == null; if (d && empty) { Throwable ex = error; if (ex != null) { for (MulticastSubscription<T> inner : subs.getAndSet(TERMINATED)) { inner.onError(ex); } } else { for (MulticastSubscription<T> inner : subs.getAndSet(TERMINATED)) { inner.onComplete(); } } return; } if (empty) { break; } for (MulticastSubscription<T> inner : as) { inner.onNext(v); } r--; if (fm != QueueSubscription.SYNC) { if (++c == lim) { c = 0; upstream.get().request(lim); } } } if (r == 0) { MulticastSubscription<T>[] bs = subs.get(); if (bs == TERMINATED) { q.clear(); return; } if (as != bs) { continue outer; } if (done && q.isEmpty()) { Throwable ex = error; if (ex != null) { for (MulticastSubscription<T> inner : subs.getAndSet(TERMINATED)) { inner.onError(ex); } } else { for (MulticastSubscription<T> inner : subs.getAndSet(TERMINATED)) { inner.onComplete(); } } return; } } } } missed = wip.addAndGet(-missed); if (missed == 0) { break; } } } static final class MulticastSubscription<T> extends AtomicLong implements Subscription { private static final long serialVersionUID = -363282618957264509L; final Subscriber<? super T> downstream; final MulticastProcessor<T> parent; long emitted; MulticastSubscription(Subscriber<? super T> actual, MulticastProcessor<T> parent) { this.downstream = actual; this.parent = parent; } @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { for (;;) { long r = get(); if (r == Long.MIN_VALUE || r == Long.MAX_VALUE) { break; } long u = r + n; if (u < 0L) { u = Long.MAX_VALUE; } if (compareAndSet(r, u)) { parent.drain(); break; } } } } @Override public void cancel() { if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { parent.remove(this); } } void onNext(T t) { if (get() != Long.MIN_VALUE) { emitted++; downstream.onNext(t); } } void onError(Throwable t) { if (get() != Long.MIN_VALUE) { downstream.onError(t); } } void onComplete() { if (get() != Long.MIN_VALUE) { downstream.onComplete(); } } } }