package io.reactivex.internal.operators.flowable;
import java.util.Collection;
import java.util.concurrent.Callable;
import org.reactivestreams.Subscriber;
import io.reactivex.Flowable;
import io.reactivex.annotations.Nullable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.QueueFuseable;
import io.reactivex.internal.subscribers.BasicFuseableSubscriber;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.plugins.RxJavaPlugins;
public final class FlowableDistinct<T, K> extends AbstractFlowableWithUpstream<T, T> {
final Function<? super T, K> keySelector;
final Callable<? extends Collection<? super K>> collectionSupplier;
public FlowableDistinct(Flowable<T> source, Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier) {
super(source);
this.keySelector = keySelector;
this.collectionSupplier = collectionSupplier;
}
@Override
protected void subscribeActual(Subscriber<? super T> subscriber) {
Collection<? super K> collection;
try {
collection = ObjectHelper.requireNonNull(collectionSupplier.call(), "The collectionSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, subscriber);
return;
}
source.subscribe(new DistinctSubscriber<T, K>(subscriber, keySelector, collection));
}
static final class DistinctSubscriber<T, K> extends BasicFuseableSubscriber<T, T> {
final Collection<? super K> collection;
final Function<? super T, K> keySelector;
DistinctSubscriber(Subscriber<? super T> actual, Function<? super T, K> keySelector, Collection<? super K> collection) {
super(actual);
this.keySelector = keySelector;
this.collection = collection;
}
@Override
public void onNext(T value) {
if (done) {
return;
}
if (sourceMode == NONE) {
K key;
boolean b;
try {
key = ObjectHelper.requireNonNull(keySelector.apply(value), "The keySelector returned a null key");
b = collection.add(key);
} catch (Throwable ex) {
fail(ex);
return;
}
if (b) {
downstream.onNext(value);
} else {
upstream.request(1);
}
} else {
downstream.onNext(null);
}
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaPlugins.onError(e);
} else {
done = true;
collection.clear();
downstream.onError(e);
}
}
@Override
public void onComplete() {
if (!done) {
done = true;
collection.clear();
downstream.onComplete();
}
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public T poll() throws Exception {
for (;;) {
T v = qs.poll();
if (v == null || collection.add(ObjectHelper.requireNonNull(keySelector.apply(v), "The keySelector returned a null key"))) {
return v;
} else {
if (sourceMode == QueueFuseable.ASYNC) {
upstream.request(1);
}
}
}
}
@Override
public void clear() {
collection.clear();
super.clear();
}
}
}