package io.reactivex.internal.operators.observable;
import java.util.concurrent.atomic.*;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observables.ConnectableObservable;
public final class ObservablePublishAlt<T> extends ConnectableObservable<T>
implements HasUpstreamObservableSource<T>, ResettableConnectable {
final ObservableSource<T> source;
final AtomicReference<PublishConnection<T>> current;
public ObservablePublishAlt(ObservableSource<T> source) {
this.source = source;
this.current = new AtomicReference<PublishConnection<T>>();
}
@Override
public void connect(Consumer<? super Disposable> connection) {
boolean doConnect = false;
PublishConnection<T> conn;
for (;;) {
conn = current.get();
if (conn == null || conn.isDisposed()) {
PublishConnection<T> fresh = new PublishConnection<T>(current);
if (!current.compareAndSet(conn, fresh)) {
continue;
}
conn = fresh;
}
doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true);
break;
}
try {
connection.accept(conn);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
if (doConnect) {
source.subscribe(conn);
}
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
PublishConnection<T> conn;
for (;;) {
conn = current.get();
if (conn == null) {
PublishConnection<T> fresh = new PublishConnection<T>(current);
if (!current.compareAndSet(conn, fresh)) {
continue;
}
conn = fresh;
}
break;
}
InnerDisposable<T> inner = new InnerDisposable<T>(observer, conn);
observer.onSubscribe(inner);
if (conn.add(inner)) {
if (inner.isDisposed()) {
conn.remove(inner);
}
return;
}
Throwable error = conn.error;
if (error != null) {
observer.onError(error);
} else {
observer.onComplete();
}
}
@Override
@SuppressWarnings("unchecked")
public void resetIf(Disposable connection) {
current.compareAndSet((PublishConnection<T>)connection, null);
}
@Override
public ObservableSource<T> source() {
return source;
}
static final class PublishConnection<T>
extends AtomicReference<InnerDisposable<T>[]>
implements Observer<T>, Disposable {
private static final long serialVersionUID = -3251430252873581268L;
final AtomicBoolean connect;
final AtomicReference<PublishConnection<T>> current;
final AtomicReference<Disposable> upstream;
@SuppressWarnings("rawtypes")
static final InnerDisposable[] EMPTY = new InnerDisposable[0];
@SuppressWarnings("rawtypes")
static final InnerDisposable[] TERMINATED = new InnerDisposable[0];
Throwable error;
@SuppressWarnings("unchecked")
public PublishConnection(AtomicReference<PublishConnection<T>> current) {
this.connect = new AtomicBoolean();
this.current = current;
this.upstream = new AtomicReference<Disposable>();
lazySet(EMPTY);
}
@SuppressWarnings("unchecked")
@Override
public void dispose() {
getAndSet(TERMINATED);
current.compareAndSet(this, null);
DisposableHelper.dispose(upstream);
}
@Override
public boolean isDisposed() {
return get() == TERMINATED;
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(upstream, d);
}
@Override
public void onNext(T t) {
for (InnerDisposable<T> inner : get()) {
inner.downstream.onNext(t);
}
}
@Override
@SuppressWarnings("unchecked")
public void onError(Throwable e) {
error = e;
upstream.lazySet(DisposableHelper.DISPOSED);
for (InnerDisposable<T> inner : getAndSet(TERMINATED)) {
inner.downstream.onError(e);
}
}
@Override
@SuppressWarnings("unchecked")
public void onComplete() {
upstream.lazySet(DisposableHelper.DISPOSED);
for (InnerDisposable<T> inner : getAndSet(TERMINATED)) {
inner.downstream.onComplete();
}
}
public boolean add(InnerDisposable<T> inner) {
for (;;) {
InnerDisposable<T>[] a = get();
if (a == TERMINATED) {
return false;
}
int n = a.length;
@SuppressWarnings("unchecked")
InnerDisposable<T>[] b = new InnerDisposable[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (compareAndSet(a, b)) {
return true;
}
}
}
@SuppressWarnings("unchecked")
public void remove(InnerDisposable<T> inner) {
for (;;) {
InnerDisposable<T>[] a = get();
int n = a.length;
if (n == 0) {
return;
}
int j = -1;
for (int i = 0; i < n; i++) {
if (a[i] == inner) {
j = i;
break;
}
}
if (j < 0) {
return;
}
InnerDisposable<T>[] b = EMPTY;
if (n != 1) {
b = new InnerDisposable[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (compareAndSet(a, b)) {
return;
}
}
}
}
static final class InnerDisposable<T>
extends AtomicReference<PublishConnection<T>>
implements Disposable {
private static final long serialVersionUID = 7463222674719692880L;
final Observer<? super T> downstream;
public InnerDisposable(Observer<? super T> downstream, PublishConnection<T> parent) {
this.downstream = downstream;
lazySet(parent);
}
@Override
public void dispose() {
PublishConnection<T> p = getAndSet(null);
if (p != null) {
p.remove(this);
}
}
@Override
public boolean isDisposed() {
return get() == null;
}
}
}