package io.reactivex.internal.operators.maybe;
import java.util.Iterator;
import java.util.concurrent.atomic.*;
import org.reactivestreams.*;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.*;
import io.reactivex.internal.util.*;
public final class MaybeConcatIterable<T> extends Flowable<T> {
final Iterable<? extends MaybeSource<? extends T>> sources;
public MaybeConcatIterable(Iterable<? extends MaybeSource<? extends T>> sources) {
this.sources = sources;
}
@Override
protected void subscribeActual(Subscriber<? super T> s) {
Iterator<? extends MaybeSource<? extends T>> it;
try {
it = ObjectHelper.requireNonNull(sources.iterator(), "The sources Iterable returned a null Iterator");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
ConcatMaybeObserver<T> parent = new ConcatMaybeObserver<T>(s, it);
s.onSubscribe(parent);
parent.drain();
}
static final class ConcatMaybeObserver<T>
extends AtomicInteger
implements MaybeObserver<T>, Subscription {
private static final long serialVersionUID = 3520831347801429610L;
final Subscriber<? super T> downstream;
final AtomicLong requested;
final AtomicReference<Object> current;
final SequentialDisposable disposables;
final Iterator<? extends MaybeSource<? extends T>> sources;
long produced;
ConcatMaybeObserver(Subscriber<? super T> actual, Iterator<? extends MaybeSource<? extends T>> sources) {
this.downstream = actual;
this.sources = sources;
this.requested = new AtomicLong();
this.disposables = new SequentialDisposable();
this.current = new AtomicReference<Object>(NotificationLite.COMPLETE);
}
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
drain();
}
}
@Override
public void cancel() {
disposables.dispose();
}
@Override
public void onSubscribe(Disposable d) {
disposables.replace(d);
}
@Override
public void onSuccess(T value) {
current.lazySet(value);
drain();
}
@Override
public void onError(Throwable e) {
downstream.onError(e);
}
@Override
public void onComplete() {
current.lazySet(NotificationLite.COMPLETE);
drain();
}
@SuppressWarnings("unchecked")
void drain() {
if (getAndIncrement() != 0) {
return;
}
AtomicReference<Object> c = current;
Subscriber<? super T> a = downstream;
Disposable cancelled = disposables;
for (;;) {
if (cancelled.isDisposed()) {
c.lazySet(null);
return;
}
Object o = c.get();
if (o != null) {
boolean goNextSource;
if (o != NotificationLite.COMPLETE) {
long p = produced;
if (p != requested.get()) {
produced = p + 1;
c.lazySet(null);
goNextSource = true;
a.onNext((T)o);
} else {
goNextSource = false;
}
} else {
goNextSource = true;
c.lazySet(null);
}
if (goNextSource && !cancelled.isDisposed()) {
boolean b;
try {
b = sources.hasNext();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}
if (b) {
MaybeSource<? extends T> source;
try {
source = ObjectHelper.requireNonNull(sources.next(), "The source Iterator returned a null MaybeSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
a.onError(ex);
return;
}
source.subscribe(this);
} else {
a.onComplete();
}
}
}
if (decrementAndGet() == 0) {
break;
}
}
}
}
}