Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.subjects; import io.reactivex.annotations.Nullable; import io.reactivex.annotations.NonNull; import io.reactivex.plugins.RxJavaPlugins; import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.annotations.CheckReturnValue; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.EmptyDisposable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.SimpleQueue; import io.reactivex.internal.observers.BasicIntQueueDisposable; import io.reactivex.internal.queue.SpscLinkedArrayQueue;
A Subject that queues up events until a single Observer subscribes to it, replays those events to it until the Observer catches up and then switches to relaying events live to this single Observer until this UnicastSubject terminates or the Observer unsubscribes.

Note that UnicastSubject holds an unbounded internal buffer.

This subject does not have a public constructor by design; a new empty instance of this UnicastSubject can be created via the following create methods that allow specifying the retention policy for items:

  • create() - creates an empty, unbounded UnicastSubject that caches all items and the terminal event it receives.
  • create(int) - creates an empty, unbounded UnicastSubject with a hint about how many total items one expects to retain.
  • create(boolean) - creates an empty, unbounded UnicastSubject that optionally delays an error it receives and replays it after the regular items have been emitted.
  • create(int, Runnable) - creates an empty, unbounded UnicastSubject with a hint about how many total items one expects to retain and a callback that will be called exactly once when the UnicastSubject gets terminated or the single Observer unsubscribes.
  • create(int, Runnable, boolean) - creates an empty, unbounded UnicastSubject with a hint about how many total items one expects to retain and a callback that will be called exactly once when the UnicastSubject gets terminated or the single Observer unsubscribes and optionally delays an error it receives and replays it after the regular items have been emitted.

If more than one Observer attempts to subscribe to this UnicastSubject, they will receive an IllegalStateException indicating the single-use-only nature of this UnicastSubject, even if the UnicastSubject already terminated with an error.

Since a Subject is conceptionally derived from the Processor type in the Reactive Streams specification, nulls are not allowed (Rule 2.13) as parameters to onNext(Object) and onError(Throwable). Such calls will result in a NullPointerException being thrown and the subject's state is not changed.

Since a UnicastSubject is an Observable, it does not support backpressure.

When this UnicastSubject is terminated via onError(Throwable) the current or late single Observer may receive the Throwable before any available items could be emitted. To make sure an onError event is delivered to the Observer after the normal items, create a UnicastSubject with the create(boolean) or create(int, Runnable, boolean) factory methods.

Even though UnicastSubject implements the Observer interface, calling onSubscribe is not required (Rule 2.12) if the subject is used as a standalone source. However, calling onSubscribe after the UnicastSubject reached its terminal state will result in the given Disposable being disposed immediately.

Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).

This UnicastSubject supports the standard state-peeking methods hasComplete(), hasThrowable(), getThrowable() and hasObservers().

Scheduler:
UnicastSubject does not operate by default on a particular Scheduler and the single Observer gets notified on the thread the respective onXXX methods were invoked.
Error handling:
When the onError(Throwable) is called, the UnicastSubject enters into a terminal state and emits the same Throwable instance to the current single Observer. During this emission, if the single Observers disposes its respective Disposable, the Throwable is delivered to the global error handler via RxJavaPlugins.onError(Throwable). If there were no Observers subscribed to this UnicastSubject when the onError() was called, the global error handler is not invoked.

Example usage:


UnicastSubject<Integer> subject = UnicastSubject.create();
TestObserver<Integer> to1 = subject.test();
// fresh UnicastSubjects are empty
to1.assertEmpty();
TestObserver<Integer> to2 = subject.test();
// A UnicastSubject only allows one Observer during its lifetime
to2.assertFailure(IllegalStateException.class);
subject.onNext(1);
to1.assertValue(1);
subject.onNext(2);
to1.assertValues(1, 2);
subject.onComplete();
to1.assertResult(1, 2);
// ----------------------------------------------------
UnicastSubject<Integer> subject2 = UnicastSubject.create();
// a UnicastSubject caches events until its single Observer subscribes
subject2.onNext(1);
subject2.onNext(2);
subject2.onComplete();
TestObserver<Integer> to3 = subject2.test();
// the cached events are emitted in order
to3.assertResult(1, 2);
Type parameters:
  • <T> – the value type received and emitted by this Subject subclass
Since:2.0
/** * A Subject that queues up events until a single {@link Observer} subscribes to it, replays * those events to it until the {@code Observer} catches up and then switches to relaying events live to * this single {@code Observer} until this {@code UnicastSubject} terminates or the {@code Observer} unsubscribes. * <p> * <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/UnicastSubject.png" alt=""> * <p> * Note that {@code UnicastSubject} holds an unbounded internal buffer. * <p> * This subject does not have a public constructor by design; a new empty instance of this * {@code UnicastSubject} can be created via the following {@code create} methods that * allow specifying the retention policy for items: * <ul> * <li>{@link #create()} - creates an empty, unbounded {@code UnicastSubject} that * caches all items and the terminal event it receives.</li> * <li>{@link #create(int)} - creates an empty, unbounded {@code UnicastSubject} * with a hint about how many <b>total</b> items one expects to retain.</li> * <li>{@link #create(boolean)} - creates an empty, unbounded {@code UnicastSubject} that * optionally delays an error it receives and replays it after the regular items have been emitted.</li> * <li>{@link #create(int, Runnable)} - creates an empty, unbounded {@code UnicastSubject} * with a hint about how many <b>total</b> items one expects to retain and a callback that will be * called exactly once when the {@code UnicastSubject} gets terminated or the single {@code Observer} unsubscribes.</li> * <li>{@link #create(int, Runnable, boolean)} - creates an empty, unbounded {@code UnicastSubject} * with a hint about how many <b>total</b> items one expects to retain and a callback that will be * called exactly once when the {@code UnicastSubject} gets terminated or the single {@code Observer} unsubscribes * and optionally delays an error it receives and replays it after the regular items have been emitted.</li> * </ul> * <p> * If more than one {@code Observer} attempts to subscribe to this {@code UnicastSubject}, they * will receive an {@code IllegalStateException} indicating the single-use-only nature of this {@code UnicastSubject}, * even if the {@code UnicastSubject} already terminated with an error. * <p> * Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification, * {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as * parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a * {@link NullPointerException} being thrown and the subject's state is not changed. * <p> * Since a {@code UnicastSubject} is an {@link io.reactivex.Observable}, it does not support backpressure. * <p> * When this {@code UnicastSubject} is terminated via {@link #onError(Throwable)} the current or late single {@code Observer} * may receive the {@code Throwable} before any available items could be emitted. To make sure an onError event is delivered * to the {@code Observer} after the normal items, create a {@code UnicastSubject} with the {@link #create(boolean)} or * {@link #create(int, Runnable, boolean)} factory methods. * <p> * Even though {@code UnicastSubject} implements the {@code Observer} interface, calling * {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>) * if the subject is used as a standalone source. However, calling {@code onSubscribe} * after the {@code UnicastSubject} reached its terminal state will result in the * given {@code Disposable} being disposed immediately. * <p> * Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} * is required to be serialized (called from the same thread or called non-overlappingly from different threads * through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s * provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer} * consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively). * <p> * This {@code UnicastSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()}, * {@link #getThrowable()} and {@link #hasObservers()}. * <dl> * <dt><b>Scheduler:</b></dt> * <dd>{@code UnicastSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and * the single {@code Observer} gets notified on the thread the respective {@code onXXX} methods were invoked.</dd> * <dt><b>Error handling:</b></dt> * <dd>When the {@link #onError(Throwable)} is called, the {@code UnicastSubject} enters into a terminal state * and emits the same {@code Throwable} instance to the current single {@code Observer}. During this emission, * if the single {@code Observer}s disposes its respective {@code Disposable}, the * {@code Throwable} is delivered to the global error handler via * {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)}. * If there were no {@code Observer}s subscribed to this {@code UnicastSubject} when the {@code onError()} * was called, the global error handler is not invoked. * </dd> * </dl> * <p> * Example usage: * <pre><code> * UnicastSubject&lt;Integer&gt; subject = UnicastSubject.create(); * * TestObserver&lt;Integer&gt; to1 = subject.test(); * * // fresh UnicastSubjects are empty * to1.assertEmpty(); * * TestObserver&lt;Integer&gt; to2 = subject.test(); * * // A UnicastSubject only allows one Observer during its lifetime * to2.assertFailure(IllegalStateException.class); * * subject.onNext(1); * to1.assertValue(1); * * subject.onNext(2); * to1.assertValues(1, 2); * * subject.onComplete(); * to1.assertResult(1, 2); * * // ---------------------------------------------------- * * UnicastSubject&lt;Integer&gt; subject2 = UnicastSubject.create(); * * // a UnicastSubject caches events until its single Observer subscribes * subject2.onNext(1); * subject2.onNext(2); * subject2.onComplete(); * * TestObserver&lt;Integer&gt; to3 = subject2.test(); * * // the cached events are emitted in order * to3.assertResult(1, 2); * </code></pre> * @param <T> the value type received and emitted by this Subject subclass * @since 2.0 */
public final class UnicastSubject<T> extends Subject<T> {
The queue that buffers the source events.
/** The queue that buffers the source events. */
final SpscLinkedArrayQueue<T> queue;
The single Observer.
/** The single Observer. */
final AtomicReference<Observer<? super T>> downstream;
The optional callback when the Subject gets cancelled or terminates.
/** The optional callback when the Subject gets cancelled or terminates. */
final AtomicReference<Runnable> onTerminate;
deliver onNext events before error event.
/** deliver onNext events before error event. */
final boolean delayError;
Indicates the single observer has cancelled.
/** Indicates the single observer has cancelled. */
volatile boolean disposed;
Indicates the source has terminated.
/** Indicates the source has terminated. */
volatile boolean done;
The terminal error if not null. Must be set before writing to done and read after done == true.
/** * The terminal error if not null. * Must be set before writing to done and read after done == true. */
Throwable error;
Set to 1 atomically for the first and only Subscriber.
/** Set to 1 atomically for the first and only Subscriber. */
final AtomicBoolean once;
The wip counter and QueueDisposable surface.
/** The wip counter and QueueDisposable surface. */
final BasicIntQueueDisposable<T> wip; boolean enableOperatorFusion;
Creates an UnicastSubject with an internal buffer capacity hint 16.
Type parameters:
  • <T> – the value type
Returns:an UnicastSubject instance
/** * Creates an UnicastSubject with an internal buffer capacity hint 16. * @param <T> the value type * @return an UnicastSubject instance */
@CheckReturnValue @NonNull public static <T> UnicastSubject<T> create() { return new UnicastSubject<T>(bufferSize(), true); }
Creates an UnicastSubject with the given internal buffer capacity hint.
Params:
  • capacityHint – the hint to size the internal unbounded buffer
Type parameters:
  • <T> – the value type
Returns:an UnicastSubject instance
/** * Creates an UnicastSubject with the given internal buffer capacity hint. * @param <T> the value type * @param capacityHint the hint to size the internal unbounded buffer * @return an UnicastSubject instance */
@CheckReturnValue @NonNull public static <T> UnicastSubject<T> create(int capacityHint) { return new UnicastSubject<T>(capacityHint, true); }
Creates an UnicastSubject with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription.

The callback, if not null, is called exactly once and non-overlapped with any active replay.

Params:
  • capacityHint – the hint to size the internal unbounded buffer
  • onTerminate – the callback to run when the Subject is terminated or cancelled, null not allowed
Type parameters:
  • <T> – the value type
Returns:an UnicastSubject instance
/** * Creates an UnicastSubject with the given internal buffer capacity hint and a callback for * the case when the single Subscriber cancels its subscription. * * <p>The callback, if not null, is called exactly once and * non-overlapped with any active replay. * * @param <T> the value type * @param capacityHint the hint to size the internal unbounded buffer * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @return an UnicastSubject instance */
@CheckReturnValue @NonNull public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate) { return new UnicastSubject<T>(capacityHint, onTerminate, true); }
Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Subscriber cancels its subscription.

The callback, if not null, is called exactly once and non-overlapped with any active replay.

History: 2.0.8 - experimental

Params:
  • capacityHint – the hint to size the internal unbounded buffer
  • onTerminate – the callback to run when the Subject is terminated or cancelled, null not allowed
  • delayError – deliver pending onNext events before onError
Type parameters:
  • <T> – the value type
Returns:an UnicastSubject instance
Since:2.2
/** * Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and * a callback for the case when the single Subscriber cancels its subscription. * * <p>The callback, if not null, is called exactly once and * non-overlapped with any active replay. * <p>History: 2.0.8 - experimental * @param <T> the value type * @param capacityHint the hint to size the internal unbounded buffer * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @param delayError deliver pending onNext events before onError * @return an UnicastSubject instance * @since 2.2 */
@CheckReturnValue @NonNull public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError) { return new UnicastSubject<T>(capacityHint, onTerminate, delayError); }
Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.

The callback, if not null, is called exactly once and non-overlapped with any active replay.

History: 2.0.8 - experimental

Params:
  • delayError – deliver pending onNext events before onError
Type parameters:
  • <T> – the value type
Returns:an UnicastSubject instance
Since:2.2
/** * Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag. * * <p>The callback, if not null, is called exactly once and * non-overlapped with any active replay. * <p>History: 2.0.8 - experimental * @param <T> the value type * @param delayError deliver pending onNext events before onError * @return an UnicastSubject instance * @since 2.2 */
@CheckReturnValue @NonNull public static <T> UnicastSubject<T> create(boolean delayError) { return new UnicastSubject<T>(bufferSize(), delayError); }
Creates an UnicastSubject with the given capacity hint and delay error flag.

History: 2.0.8 - experimental

Params:
  • capacityHint – the capacity hint for the internal, unbounded queue
  • delayError – deliver pending onNext events before onError
Since:2.2
/** * Creates an UnicastSubject with the given capacity hint and delay error flag. * <p>History: 2.0.8 - experimental * @param capacityHint the capacity hint for the internal, unbounded queue * @param delayError deliver pending onNext events before onError * @since 2.2 */
UnicastSubject(int capacityHint, boolean delayError) { this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference<Runnable>(); this.delayError = delayError; this.downstream = new AtomicReference<Observer<? super T>>(); this.once = new AtomicBoolean(); this.wip = new UnicastQueueDisposable(); }
Creates an UnicastSubject with the given capacity hint and callback for when the Subject is terminated normally or its single Subscriber cancels.
Params:
  • capacityHint – the capacity hint for the internal, unbounded queue
  • onTerminate – the callback to run when the Subject is terminated or cancelled, null not allowed
Since:2.0
/** * Creates an UnicastSubject with the given capacity hint and callback * for when the Subject is terminated normally or its single Subscriber cancels. * @param capacityHint the capacity hint for the internal, unbounded queue * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @since 2.0 * * */
UnicastSubject(int capacityHint, Runnable onTerminate) { this(capacityHint, onTerminate, true); }
Creates an UnicastSubject with the given capacity hint, delay error flag and callback for when the Subject is terminated normally or its single Subscriber cancels.

History: 2.0.8 - experimental

Params:
  • capacityHint – the capacity hint for the internal, unbounded queue
  • onTerminate – the callback to run when the Subject is terminated or cancelled, null not allowed
  • delayError – deliver pending onNext events before onError
Since:2.2
/** * Creates an UnicastSubject with the given capacity hint, delay error flag and callback * for when the Subject is terminated normally or its single Subscriber cancels. * <p>History: 2.0.8 - experimental * @param capacityHint the capacity hint for the internal, unbounded queue * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed * @param delayError deliver pending onNext events before onError * @since 2.2 */
UnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) { this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint")); this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate")); this.delayError = delayError; this.downstream = new AtomicReference<Observer<? super T>>(); this.once = new AtomicBoolean(); this.wip = new UnicastQueueDisposable(); } @Override protected void subscribeActual(Observer<? super T> observer) { if (!once.get() && once.compareAndSet(false, true)) { observer.onSubscribe(wip); downstream.lazySet(observer); // full barrier in drain if (disposed) { downstream.lazySet(null); return; } drain(); } else { EmptyDisposable.error(new IllegalStateException("Only a single observer allowed."), observer); } } void doTerminate() { Runnable r = onTerminate.get(); if (r != null && onTerminate.compareAndSet(r, null)) { r.run(); } } @Override public void onSubscribe(Disposable d) { if (done || disposed) { d.dispose(); } } @Override public void onNext(T t) { ObjectHelper.requireNonNull(t, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."); if (done || disposed) { return; } queue.offer(t); drain(); } @Override public void onError(Throwable t) { ObjectHelper.requireNonNull(t, "onError called with null. Null values are generally not allowed in 2.x operators and sources."); if (done || disposed) { RxJavaPlugins.onError(t); return; } error = t; done = true; doTerminate(); drain(); } @Override public void onComplete() { if (done || disposed) { return; } done = true; doTerminate(); drain(); } void drainNormal(Observer<? super T> a) { int missed = 1; SimpleQueue<T> q = queue; boolean failFast = !this.delayError; boolean canBeError = true; for (;;) { for (;;) { if (disposed) { downstream.lazySet(null); q.clear(); return; } boolean d = this.done; T v = queue.poll(); boolean empty = v == null; if (d) { if (failFast && canBeError) { if (failedFast(q, a)) { return; } else { canBeError = false; } } if (empty) { errorOrComplete(a); return; } } if (empty) { break; } a.onNext(v); } missed = wip.addAndGet(-missed); if (missed == 0) { break; } } } void drainFused(Observer<? super T> a) { int missed = 1; final SpscLinkedArrayQueue<T> q = queue; final boolean failFast = !delayError; for (;;) { if (disposed) { downstream.lazySet(null); q.clear(); return; } boolean d = done; if (failFast && d) { if (failedFast(q, a)) { return; } } a.onNext(null); if (d) { errorOrComplete(a); return; } missed = wip.addAndGet(-missed); if (missed == 0) { break; } } } void errorOrComplete(Observer<? super T> a) { downstream.lazySet(null); Throwable ex = error; if (ex != null) { a.onError(ex); } else { a.onComplete(); } } boolean failedFast(final SimpleQueue<T> q, Observer<? super T> a) { Throwable ex = error; if (ex != null) { downstream.lazySet(null); q.clear(); a.onError(ex); return true; } else { return false; } } void drain() { if (wip.getAndIncrement() != 0) { return; } Observer<? super T> a = downstream.get(); int missed = 1; for (;;) { if (a != null) { if (enableOperatorFusion) { drainFused(a); } else { drainNormal(a); } return; } missed = wip.addAndGet(-missed); if (missed == 0) { break; } a = downstream.get(); } } @Override public boolean hasObservers() { return downstream.get() != null; } @Override @Nullable public Throwable getThrowable() { if (done) { return error; } return null; } @Override public boolean hasThrowable() { return done && error != null; } @Override public boolean hasComplete() { return done && error == null; } final class UnicastQueueDisposable extends BasicIntQueueDisposable<T> { private static final long serialVersionUID = 7926949470189395511L; @Override public int requestFusion(int mode) { if ((mode & ASYNC) != 0) { enableOperatorFusion = true; return ASYNC; } return NONE; } @Nullable @Override public T poll() throws Exception { return queue.poll(); } @Override public boolean isEmpty() { return queue.isEmpty(); } @Override public void clear() { queue.clear(); } @Override public void dispose() { if (!disposed) { disposed = true; doTerminate(); downstream.lazySet(null); if (wip.getAndIncrement() == 0) { downstream.lazySet(null); queue.clear(); } } } @Override public boolean isDisposed() { return disposed; } } }