Copyright (c) 2016-present, RxJava Contributors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex.internal.operators.flowable; import java.util.concurrent.atomic.*; import org.reactivestreams.*; import io.reactivex.FlowableSubscriber; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.Consumer; import io.reactivex.internal.disposables.ResettableConnectable; import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.SpscArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins;
Shares a single underlying connection to the upstream Publisher and multicasts events to all subscribed subscribers until the upstream completes or the connection is disposed.

The difference to FlowablePublish is that when the upstream terminates, late subscriberss will receive that terminal event until the connection is disposed and the ConnectableFlowable is reset to its fresh state.

Type parameters:
  • <T> – the element type
/** * Shares a single underlying connection to the upstream Publisher * and multicasts events to all subscribed subscribers until the upstream * completes or the connection is disposed. * <p> * The difference to FlowablePublish is that when the upstream terminates, * late subscriberss will receive that terminal event until the connection is * disposed and the ConnectableFlowable is reset to its fresh state. * * @param <T> the element type * @since 2.2.10 */
public final class FlowablePublishAlt<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T>, ResettableConnectable { final Publisher<T> source; final int bufferSize; final AtomicReference<PublishConnection<T>> current; public FlowablePublishAlt(Publisher<T> source, int bufferSize) { this.source = source; this.bufferSize = bufferSize; this.current = new AtomicReference<PublishConnection<T>>(); } @Override public Publisher<T> source() { return source; }
Returns:The internal buffer size of this FloawblePublishAlt operator.
/** * @return The internal buffer size of this FloawblePublishAlt operator. */
public int publishBufferSize() { return bufferSize; } @Override public void connect(Consumer<? super Disposable> connection) { PublishConnection<T> conn; boolean doConnect = false; for (;;) { conn = current.get(); if (conn == null || conn.isDisposed()) { PublishConnection<T> fresh = new PublishConnection<T>(current, bufferSize); if (!current.compareAndSet(conn, fresh)) { continue; } conn = fresh; } doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true); break; } try { connection.accept(conn); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); } if (doConnect) { source.subscribe(conn); } } @Override protected void subscribeActual(Subscriber<? super T> s) { PublishConnection<T> conn; for (;;) { conn = current.get(); // don't create a fresh connection if the current is disposed if (conn == null) { PublishConnection<T> fresh = new PublishConnection<T>(current, bufferSize); if (!current.compareAndSet(conn, fresh)) { continue; } conn = fresh; } break; } InnerSubscription<T> inner = new InnerSubscription<T>(s, conn); s.onSubscribe(inner); if (conn.add(inner)) { if (inner.isCancelled()) { conn.remove(inner); } else { conn.drain(); } return; } Throwable ex = conn.error; if (ex != null) { s.onError(ex); } else { s.onComplete(); } } @SuppressWarnings("unchecked") @Override public void resetIf(Disposable connection) { current.compareAndSet((PublishConnection<T>)connection, null); } static final class PublishConnection<T> extends AtomicInteger implements FlowableSubscriber<T>, Disposable { private static final long serialVersionUID = -1672047311619175801L; final AtomicReference<PublishConnection<T>> current; final AtomicReference<Subscription> upstream; final AtomicBoolean connect; final AtomicReference<InnerSubscription<T>[]> subscribers; final int bufferSize; volatile SimpleQueue<T> queue; int sourceMode; volatile boolean done; Throwable error; int consumed; @SuppressWarnings("rawtypes") static final InnerSubscription[] EMPTY = new InnerSubscription[0]; @SuppressWarnings("rawtypes") static final InnerSubscription[] TERMINATED = new InnerSubscription[0]; @SuppressWarnings("unchecked") PublishConnection(AtomicReference<PublishConnection<T>> current, int bufferSize) { this.current = current; this.upstream = new AtomicReference<Subscription>(); this.connect = new AtomicBoolean(); this.bufferSize = bufferSize; this.subscribers = new AtomicReference<InnerSubscription<T>[]>(EMPTY); } @SuppressWarnings("unchecked") @Override public void dispose() { subscribers.getAndSet(TERMINATED); current.compareAndSet(this, null); SubscriptionHelper.cancel(upstream); } @Override public boolean isDisposed() { return subscribers.get() == TERMINATED; } @Override public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this.upstream, s)) { if (s instanceof QueueSubscription) { @SuppressWarnings("unchecked") QueueSubscription<T> qs = (QueueSubscription<T>) s; int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY); if (m == QueueSubscription.SYNC) { sourceMode = m; queue = qs; done = true; drain(); return; } if (m == QueueSubscription.ASYNC) { sourceMode = m; queue = qs; s.request(bufferSize); return; } } queue = new SpscArrayQueue<T>(bufferSize); s.request(bufferSize); } } @Override public void onNext(T t) { // we expect upstream to honor backpressure requests if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) { onError(new MissingBackpressureException("Prefetch queue is full?!")); return; } // since many things can happen concurrently, we have a common dispatch // loop to act on the current state serially drain(); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); } else { error = t; done = true; drain(); } } @Override public void onComplete() { done = true; drain(); } void drain() { if (getAndIncrement() != 0) { return; } int missed = 1; SimpleQueue<T> queue = this.queue; int consumed = this.consumed; int limit = this.bufferSize - (this.bufferSize >> 2); boolean async = this.sourceMode != QueueSubscription.SYNC; outer: for (;;) { if (queue != null) { long minDemand = Long.MAX_VALUE; boolean hasDemand = false; InnerSubscription<T>[] innerSubscriptions = subscribers.get(); for (InnerSubscription<T> inner : innerSubscriptions) { long request = inner.get(); if (request != Long.MIN_VALUE) { hasDemand = true; minDemand = Math.min(request - inner.emitted, minDemand); } } if (!hasDemand) { minDemand = 0L; } while (minDemand != 0L) { boolean d = done; T v; try { v = queue.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); upstream.get().cancel(); queue.clear(); done = true; signalError(ex); return; } boolean empty = v == null; if (checkTerminated(d, empty)) { return; } if (empty) { break; } for (InnerSubscription<T> inner : innerSubscriptions) { if (!inner.isCancelled()) { inner.downstream.onNext(v); inner.emitted++; } } if (async && ++consumed == limit) { consumed = 0; upstream.get().request(limit); } minDemand--; if (innerSubscriptions != subscribers.get()) { continue outer; } } if (checkTerminated(done, queue.isEmpty())) { return; } } this.consumed = consumed; missed = addAndGet(-missed); if (missed == 0) { break; } if (queue == null) { queue = this.queue; } } } @SuppressWarnings("unchecked") boolean checkTerminated(boolean isDone, boolean isEmpty) { if (isDone && isEmpty) { Throwable ex = error; if (ex != null) { signalError(ex); } else { for (InnerSubscription<T> inner : subscribers.getAndSet(TERMINATED)) { if (!inner.isCancelled()) { inner.downstream.onComplete(); } } } return true; } return false; } @SuppressWarnings("unchecked") void signalError(Throwable ex) { for (InnerSubscription<T> inner : subscribers.getAndSet(TERMINATED)) { if (!inner.isCancelled()) { inner.downstream.onError(ex); } } } boolean add(InnerSubscription<T> inner) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // get the current producer array InnerSubscription<T>[] c = subscribers.get(); // if this subscriber-to-source reached a terminal state by receiving // an onError or onComplete, just refuse to add the new producer if (c == TERMINATED) { return false; } // we perform a copy-on-write logic int len = c.length; @SuppressWarnings("unchecked") InnerSubscription<T>[] u = new InnerSubscription[len + 1]; System.arraycopy(c, 0, u, 0, len); u[len] = inner; // try setting the subscribers array if (subscribers.compareAndSet(c, u)) { return true; } // if failed, some other operation succeeded (another add, remove or termination) // so retry } } @SuppressWarnings("unchecked") void remove(InnerSubscription<T> inner) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // let's read the current subscribers array InnerSubscription<T>[] c = subscribers.get(); int len = c.length; // if it is either empty or terminated, there is nothing to remove so we quit if (len == 0) { break; } // let's find the supplied producer in the array // although this is O(n), we don't expect too many child subscribers in general int j = -1; for (int i = 0; i < len; i++) { if (c[i] == inner) { j = i; break; } } // we didn't find it so just quit if (j < 0) { return; } // we do copy-on-write logic here InnerSubscription<T>[] u; // we don't create a new empty array if producer was the single inhabitant // but rather reuse an empty array if (len == 1) { u = EMPTY; } else { // otherwise, create a new array one less in size u = new InnerSubscription[len - 1]; // copy elements being before the given producer System.arraycopy(c, 0, u, 0, j); // copy elements being after the given producer System.arraycopy(c, j + 1, u, j, len - j - 1); } // try setting this new array as if (subscribers.compareAndSet(c, u)) { break; } // if we failed, it means something else happened // (a concurrent add/remove or termination), we need to retry } } } static final class InnerSubscription<T> extends AtomicLong implements Subscription { private static final long serialVersionUID = 2845000326761540265L; final Subscriber<? super T> downstream; final PublishConnection<T> parent; long emitted; InnerSubscription(Subscriber<? super T> downstream, PublishConnection<T> parent) { this.downstream = downstream; this.parent = parent; } @Override public void request(long n) { BackpressureHelper.addCancel(this, n); parent.drain(); } @Override public void cancel() { if (getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { parent.remove(this); parent.drain(); } } public boolean isCancelled() { return get() == Long.MIN_VALUE; } } }