package io.reactivex.internal.operators.observable;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
public final class ObservableOnErrorReturn<T> extends AbstractObservableWithUpstream<T, T> {
final Function<? super Throwable, ? extends T> valueSupplier;
public ObservableOnErrorReturn(ObservableSource<T> source, Function<? super Throwable, ? extends T> valueSupplier) {
super(source);
this.valueSupplier = valueSupplier;
}
@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new OnErrorReturnObserver<T>(t, valueSupplier));
}
static final class OnErrorReturnObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Function<? super Throwable, ? extends T> valueSupplier;
Disposable upstream;
OnErrorReturnObserver(Observer<? super T> actual, Function<? super Throwable, ? extends T> valueSupplier) {
this.downstream = actual;
this.valueSupplier = valueSupplier;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
@Override
public void dispose() {
upstream.dispose();
}
@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
T v;
try {
v = valueSupplier.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
if (v == null) {
NullPointerException e = new NullPointerException("The supplied value is null");
e.initCause(t);
downstream.onError(e);
return;
}
downstream.onNext(v);
downstream.onComplete();
}
@Override
public void onComplete() {
downstream.onComplete();
}
}
}