package io.reactivex.internal.operators.observable;
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.BiPredicate;
import io.reactivex.internal.disposables.SequentialDisposable;
public final class ObservableRetryBiPredicate<T> extends AbstractObservableWithUpstream<T, T> {
final BiPredicate<? super Integer, ? super Throwable> predicate;
public ObservableRetryBiPredicate(
Observable<T> source,
BiPredicate<? super Integer, ? super Throwable> predicate) {
super(source);
this.predicate = predicate;
}
@Override
public void subscribeActual(Observer<? super T> observer) {
SequentialDisposable sa = new SequentialDisposable();
observer.onSubscribe(sa);
RetryBiObserver<T> rs = new RetryBiObserver<T>(observer, predicate, sa, source);
rs.subscribeNext();
}
static final class RetryBiObserver<T> extends AtomicInteger implements Observer<T> {
private static final long serialVersionUID = -7098360935104053232L;
final Observer<? super T> downstream;
final SequentialDisposable upstream;
final ObservableSource<? extends T> source;
final BiPredicate<? super Integer, ? super Throwable> predicate;
int retries;
RetryBiObserver(Observer<? super T> actual,
BiPredicate<? super Integer, ? super Throwable> predicate, SequentialDisposable sa, ObservableSource<? extends T> source) {
this.downstream = actual;
this.upstream = sa;
this.source = source;
this.predicate = predicate;
}
@Override
public void onSubscribe(Disposable d) {
upstream.replace(d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
boolean b;
try {
b = predicate.test(++retries, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
if (!b) {
downstream.onError(t);
return;
}
subscribeNext();
}
@Override
public void onComplete() {
downstream.onComplete();
}
void subscribeNext() {
if (getAndIncrement() == 0) {
int missed = 1;
for (;;) {
if (upstream.isDisposed()) {
return;
}
source.subscribe(this);
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
}
}