package io.reactivex.internal.operators.mixed;
import java.util.concurrent.atomic.*;
import org.reactivestreams.*;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
public final class MaybeFlatMapPublisher<T, R> extends Flowable<R> {
final MaybeSource<T> source;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
public MaybeFlatMapPublisher(MaybeSource<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(Subscriber<? super R> s) {
source.subscribe(new FlatMapPublisherSubscriber<T, R>(s, mapper));
}
static final class FlatMapPublisherSubscriber<T, R>
extends AtomicReference<Subscription>
implements FlowableSubscriber<R>, MaybeObserver<T>, Subscription {
private static final long serialVersionUID = -8948264376121066672L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
Disposable upstream;
final AtomicLong requested;
FlatMapPublisherSubscriber(Subscriber<? super R> downstream, Function<? super T, ? extends Publisher<? extends R>> mapper) {
this.downstream = downstream;
this.mapper = mapper;
this.requested = new AtomicLong();
}
@Override
public void onNext(R t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void request(long n) {
SubscriptionHelper.deferredRequest(this, requested, n);
}
@Override
public void cancel() {
upstream.dispose();
SubscriptionHelper.cancel(this);
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T t) {
Publisher<? extends R> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
p.subscribe(this);
}
@Override
public void onSubscribe(Subscription s) {
SubscriptionHelper.deferredSetOnce(this, requested, s);
}
}
}