package io.reactivex.internal.operators.flowable;
import java.util.concurrent.atomic.*;
import org.reactivestreams.*;
import io.reactivex.*;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;
public final class FlowableDelaySubscriptionOther<T, U> extends Flowable<T> {
final Publisher<? extends T> main;
final Publisher<U> other;
public FlowableDelaySubscriptionOther(Publisher<? extends T> main, Publisher<U> other) {
this.main = main;
this.other = other;
}
@Override
public void subscribeActual(final Subscriber<? super T> child) {
MainSubscriber<T> parent = new MainSubscriber<T>(child, main);
child.onSubscribe(parent);
other.subscribe(parent.other);
}
static final class MainSubscriber<T> extends AtomicLong implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = 2259811067697317255L;
final Subscriber<? super T> downstream;
final Publisher<? extends T> main;
final OtherSubscriber other;
final AtomicReference<Subscription> upstream;
MainSubscriber(Subscriber<? super T> downstream, Publisher<? extends T> main) {
this.downstream = downstream;
this.main = main;
this.other = new OtherSubscriber();
this.upstream = new AtomicReference<Subscription>();
}
void next() {
main.subscribe(this);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
SubscriptionHelper.deferredRequest(upstream, this, n);
}
}
@Override
public void cancel() {
SubscriptionHelper.cancel(other);
SubscriptionHelper.cancel(upstream);
}
@Override
public void onSubscribe(Subscription s) {
SubscriptionHelper.deferredSetOnce(upstream, this, s);
}
final class OtherSubscriber extends AtomicReference<Subscription> implements FlowableSubscriber<Object> {
private static final long serialVersionUID = -3892798459447644106L;
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this, s)) {
s.request(Long.MAX_VALUE);
}
}
@Override
public void onNext(Object t) {
Subscription s = get();
if (s != SubscriptionHelper.CANCELLED) {
lazySet(SubscriptionHelper.CANCELLED);
s.cancel();
next();
}
}
@Override
public void onError(Throwable t) {
Subscription s = get();
if (s != SubscriptionHelper.CANCELLED) {
downstream.onError(t);
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
Subscription s = get();
if (s != SubscriptionHelper.CANCELLED) {
next();
}
}
}
}
}