Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.internal.operators.observable; import java.util.concurrent.atomic.*; import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.Consumer; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.fuseable.HasUpstreamObservableSource; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observables.ConnectableObservable; import io.reactivex.plugins.RxJavaPlugins;
A connectable observable which shares an underlying source and dispatches source values to observers in a backpressure-aware manner.
Type parameters:
  • <T> – the value type
/** * A connectable observable which shares an underlying source and dispatches source values to observers in a backpressure-aware * manner. * @param <T> the value type */
public final class ObservablePublish<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T>, ObservablePublishClassic<T> {
The source observable.
/** The source observable. */
final ObservableSource<T> source;
Holds the current subscriber that is, will be or just was subscribed to the source observable.
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<PublishObserver<T>> current; final ObservableSource<T> onSubscribe;
Creates a OperatorPublish instance to publish values of the given source observable.
Params:
  • source – the source observable
Type parameters:
  • <T> – the source value type
Returns:the connectable observable
/** * Creates a OperatorPublish instance to publish values of the given source observable. * @param <T> the source value type * @param source the source observable * @return the connectable observable */
public static <T> ConnectableObservable<T> create(ObservableSource<T> source) { // the current connection to source needs to be shared between the operator and its onSubscribe call final AtomicReference<PublishObserver<T>> curr = new AtomicReference<PublishObserver<T>>(); ObservableSource<T> onSubscribe = new PublishSource<T>(curr); return RxJavaPlugins.onAssembly(new ObservablePublish<T>(onSubscribe, source, curr)); } private ObservablePublish(ObservableSource<T> onSubscribe, ObservableSource<T> source, final AtomicReference<PublishObserver<T>> current) { this.onSubscribe = onSubscribe; this.source = source; this.current = current; } @Override public ObservableSource<T> source() { return source; } @Override public ObservableSource<T> publishSource() { return source; } @Override protected void subscribeActual(Observer<? super T> observer) { onSubscribe.subscribe(observer); } @Override public void connect(Consumer<? super Disposable> connection) { boolean doConnect; PublishObserver<T> ps; // we loop because concurrent connect/disconnect and termination may change the state for (;;) { // retrieve the current subscriber-to-source instance ps = current.get(); // if there is none yet or the current has been disposed if (ps == null || ps.isDisposed()) { // create a new subscriber-to-source PublishObserver<T> u = new PublishObserver<T>(current); // try setting it as the current subscriber-to-source if (!current.compareAndSet(ps, u)) { // did not work, perhaps a new subscriber arrived // and created a new subscriber-to-source as well, retry continue; } ps = u; } // if connect() was called concurrently, only one of them should actually // connect to the source doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); break; // NOPMD } /* * Notify the callback that we have a (new) connection which it can dispose * but since ps is unique to a connection, multiple calls to connect() will return the * same Disposable and even if there was a connect-disconnect-connect pair, the older * references won't disconnect the newer connection. * Synchronous source consumers have the opportunity to disconnect via dispose on the * Disposable as subscribe() may never return in its own. * * Note however, that asynchronously disconnecting a running source might leave * child observers without any terminal event; PublishSubject does not have this * issue because the dispose() was always triggered by the child observers * themselves. */ try { connection.accept(ps); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); } if (doConnect) { source.subscribe(ps); } } @SuppressWarnings("rawtypes") static final class PublishObserver<T> implements Observer<T>, Disposable {
Holds onto the current connected PublishObserver.
/** Holds onto the current connected PublishObserver. */
final AtomicReference<PublishObserver<T>> current;
Indicates an empty array of inner observers.
/** Indicates an empty array of inner observers. */
static final InnerDisposable[] EMPTY = new InnerDisposable[0];
Indicates a terminated PublishObserver.
/** Indicates a terminated PublishObserver. */
static final InnerDisposable[] TERMINATED = new InnerDisposable[0];
Tracks the subscribed observers.
/** Tracks the subscribed observers. */
final AtomicReference<InnerDisposable<T>[]> observers;
Atomically changed from false to true by connect to make sure the connection is only performed by one thread.
/** * Atomically changed from false to true by connect to make sure the * connection is only performed by one thread. */
final AtomicBoolean shouldConnect; final AtomicReference<Disposable> upstream = new AtomicReference<Disposable>(); @SuppressWarnings("unchecked") PublishObserver(AtomicReference<PublishObserver<T>> current) { this.observers = new AtomicReference<InnerDisposable<T>[]>(EMPTY); this.current = current; this.shouldConnect = new AtomicBoolean(); } @SuppressWarnings("unchecked") @Override public void dispose() { InnerDisposable[] ps = observers.getAndSet(TERMINATED); if (ps != TERMINATED) { current.compareAndSet(PublishObserver.this, null); DisposableHelper.dispose(upstream); } } @Override public boolean isDisposed() { return observers.get() == TERMINATED; } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(this.upstream, d); } @Override public void onNext(T t) { for (InnerDisposable<T> inner : observers.get()) { inner.child.onNext(t); } } @SuppressWarnings("unchecked") @Override public void onError(Throwable e) { current.compareAndSet(this, null); InnerDisposable<T>[] a = observers.getAndSet(TERMINATED); if (a.length != 0) { for (InnerDisposable<T> inner : a) { inner.child.onError(e); } } else { RxJavaPlugins.onError(e); } } @SuppressWarnings("unchecked") @Override public void onComplete() { current.compareAndSet(this, null); for (InnerDisposable<T> inner : observers.getAndSet(TERMINATED)) { inner.child.onComplete(); } }
Atomically try adding a new InnerDisposable to this Observer or return false if this Observer was terminated.
Params:
  • producer – the producer to add
Returns:true if succeeded, false otherwise
/** * Atomically try adding a new InnerDisposable to this Observer or return false if this * Observer was terminated. * @param producer the producer to add * @return true if succeeded, false otherwise */
boolean add(InnerDisposable<T> producer) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // get the current producer array InnerDisposable<T>[] c = observers.get(); // if this subscriber-to-source reached a terminal state by receiving // an onError or onComplete, just refuse to add the new producer if (c == TERMINATED) { return false; } // we perform a copy-on-write logic int len = c.length; @SuppressWarnings("unchecked") InnerDisposable<T>[] u = new InnerDisposable[len + 1]; System.arraycopy(c, 0, u, 0, len); u[len] = producer; // try setting the observers array if (observers.compareAndSet(c, u)) { return true; } // if failed, some other operation succeeded (another add, remove or termination) // so retry } }
Atomically removes the given producer from the observers array.
Params:
  • producer – the producer to remove
/** * Atomically removes the given producer from the observers array. * @param producer the producer to remove */
@SuppressWarnings("unchecked") void remove(InnerDisposable<T> producer) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // let's read the current observers array InnerDisposable<T>[] c = observers.get(); // if it is either empty or terminated, there is nothing to remove so we quit int len = c.length; if (len == 0) { return; } // let's find the supplied producer in the array // although this is O(n), we don't expect too many child observers in general int j = -1; for (int i = 0; i < len; i++) { if (c[i].equals(producer)) { j = i; break; } } // we didn't find it so just quit if (j < 0) { return; } // we do copy-on-write logic here InnerDisposable<T>[] u; // we don't create a new empty array if producer was the single inhabitant // but rather reuse an empty array if (len == 1) { u = EMPTY; } else { // otherwise, create a new array one less in size u = new InnerDisposable[len - 1]; // copy elements being before the given producer System.arraycopy(c, 0, u, 0, j); // copy elements being after the given producer System.arraycopy(c, j + 1, u, j, len - j - 1); } // try setting this new array as if (observers.compareAndSet(c, u)) { return; } // if we failed, it means something else happened // (a concurrent add/remove or termination), we need to retry } } }
A Disposable that manages the request and disposed state of a child Observer in thread-safe manner. this holds the parent PublishObserver or itself if disposed
Type parameters:
  • <T> – the value type
/** * A Disposable that manages the request and disposed state of a * child Observer in thread-safe manner. * {@code this} holds the parent PublishObserver or itself if disposed * @param <T> the value type */
static final class InnerDisposable<T> extends AtomicReference<Object> implements Disposable { private static final long serialVersionUID = -1100270633763673112L;
The actual child subscriber.
/** The actual child subscriber. */
final Observer<? super T> child; InnerDisposable(Observer<? super T> child) { this.child = child; } @Override public boolean isDisposed() { return get() == this; } @SuppressWarnings("unchecked") @Override public void dispose() { Object o = getAndSet(this); if (o != null && o != this) { ((PublishObserver<T>)o).remove(this); } } void setParent(PublishObserver<T> p) { if (!compareAndSet(null, p)) { p.remove(this); } } } static final class PublishSource<T> implements ObservableSource<T> { private final AtomicReference<PublishObserver<T>> curr; PublishSource(AtomicReference<PublishObserver<T>> curr) { this.curr = curr; } @Override public void subscribe(Observer<? super T> child) { // create the backpressure-managing producer for this child InnerDisposable<T> inner = new InnerDisposable<T>(child); child.onSubscribe(inner); // concurrent connection/disconnection may change the state, // we loop to be atomic while the child subscribes for (;;) { // get the current subscriber-to-source PublishObserver<T> r = curr.get(); // if there isn't one or it is disposed if (r == null || r.isDisposed()) { // create a new subscriber to source PublishObserver<T> u = new PublishObserver<T>(curr); // let's try setting it as the current subscriber-to-source if (!curr.compareAndSet(r, u)) { // didn't work, maybe someone else did it or the current subscriber // to source has just finished continue; } // we won, let's use it going onwards r = u; } /* * Try adding it to the current subscriber-to-source, add is atomic in respect * to other adds and the termination of the subscriber-to-source. */ if (r.add(inner)) { inner.setParent(r); break; // NOPMD } /* * The current PublishObserver has been terminated, try with a newer one. */ /* * Note: although technically correct, concurrent disconnects can cause * unexpected behavior such as child observers never receiving anything * (unless connected again). An alternative approach, similar to * PublishSubject would be to immediately terminate such child * observers as well: * * Object term = r.terminalEvent; * if (r.nl.isCompleted(term)) { * child.onComplete(); * } else { * child.onError(r.nl.getError(term)); * } * return; * * The original concurrent behavior was non-deterministic in this regard as well. * Allowing this behavior, however, may introduce another unexpected behavior: * after disconnecting a previous connection, one might not be able to prepare * a new connection right after a previous termination by subscribing new child * observers asynchronously before a connect call. */ } } } }