Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.subjects; import io.reactivex.Observer; import io.reactivex.annotations.Nullable; import io.reactivex.disposables.Disposable; import io.reactivex.internal.util.*; import io.reactivex.internal.util.AppendOnlyLinkedArrayList.NonThrowingPredicate; import io.reactivex.plugins.RxJavaPlugins;
Serializes calls to the Observer methods.

All other Observable and Subject methods are thread-safe by design.

Type parameters:
  • <T> – the item value type
/** * Serializes calls to the Observer methods. * <p>All other Observable and Subject methods are thread-safe by design. * * @param <T> the item value type */
/* public */ final class SerializedSubject<T> extends Subject<T> implements NonThrowingPredicate<Object> {
The actual subscriber to serialize Subscriber calls to.
/** The actual subscriber to serialize Subscriber calls to. */
final Subject<T> actual;
Indicates an emission is going on, guarded by this.
/** Indicates an emission is going on, guarded by this. */
boolean emitting;
If not null, it holds the missed NotificationLite events.
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
Indicates a terminal event has been received and all further events will be dropped.
/** Indicates a terminal event has been received and all further events will be dropped. */
volatile boolean done;
Constructor that wraps an actual subject.
Params:
  • actual – the subject wrapped
/** * Constructor that wraps an actual subject. * @param actual the subject wrapped */
SerializedSubject(final Subject<T> actual) { this.actual = actual; } @Override protected void subscribeActual(Observer<? super T> observer) { actual.subscribe(observer); } @Override public void onSubscribe(Disposable d) { boolean cancel; if (!done) { synchronized (this) { if (done) { cancel = true; } else { if (emitting) { AppendOnlyLinkedArrayList<Object> q = queue; if (q == null) { q = new AppendOnlyLinkedArrayList<Object>(4); queue = q; } q.add(NotificationLite.disposable(d)); return; } emitting = true; cancel = false; } } } else { cancel = true; } if (cancel) { d.dispose(); } else { actual.onSubscribe(d); emitLoop(); } } @Override public void onNext(T t) { if (done) { return; } synchronized (this) { if (done) { return; } if (emitting) { AppendOnlyLinkedArrayList<Object> q = queue; if (q == null) { q = new AppendOnlyLinkedArrayList<Object>(4); queue = q; } q.add(NotificationLite.next(t)); return; } emitting = true; } actual.onNext(t); emitLoop(); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } boolean reportError; synchronized (this) { if (done) { reportError = true; } else { done = true; if (emitting) { AppendOnlyLinkedArrayList<Object> q = queue; if (q == null) { q = new AppendOnlyLinkedArrayList<Object>(4); queue = q; } q.setFirst(NotificationLite.error(t)); return; } reportError = false; emitting = true; } } if (reportError) { RxJavaPlugins.onError(t); return; } actual.onError(t); } @Override public void onComplete() { if (done) { return; } synchronized (this) { if (done) { return; } done = true; if (emitting) { AppendOnlyLinkedArrayList<Object> q = queue; if (q == null) { q = new AppendOnlyLinkedArrayList<Object>(4); queue = q; } q.add(NotificationLite.complete()); return; } emitting = true; } actual.onComplete(); }
Loops until all notifications in the queue has been processed.
/** Loops until all notifications in the queue has been processed. */
void emitLoop() { for (;;) { AppendOnlyLinkedArrayList<Object> q; synchronized (this) { q = queue; if (q == null) { emitting = false; return; } queue = null; } q.forEachWhile(this); } } @Override public boolean test(Object o) { return NotificationLite.acceptFull(o, actual); } @Override public boolean hasObservers() { return actual.hasObservers(); } @Override public boolean hasThrowable() { return actual.hasThrowable(); } @Override @Nullable public Throwable getThrowable() { return actual.getThrowable(); } @Override public boolean hasComplete() { return actual.hasComplete(); } }