package reactor.core.publisher;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
final class MonoFlattenIterable<T, R> extends FluxFromMonoOperator<T, R>
implements Fuseable {
final Function<? super T, ? extends Iterable<? extends R>> mapper;
final int prefetch;
final Supplier<Queue<T>> queueSupplier;
MonoFlattenIterable(Mono<? extends T> source,
Function<? super T, ? extends Iterable<? extends R>> mapper,
int prefetch,
Supplier<Queue<T>> queueSupplier) {
super(source);
if (prefetch <= 0) {
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
}
this.mapper = Objects.requireNonNull(mapper, "mapper");
this.prefetch = prefetch;
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
}
@Override
public int getPrefetch() {
return prefetch;
}
@SuppressWarnings("unchecked")
@Override
public void subscribe(CoreSubscriber<? super R> actual) {
if (source instanceof Callable) {
T v;
try {
v = ((Callable<T>) source).call();
}
catch (Throwable ex) {
Operators.error(actual, Operators.onOperatorError(ex,
actual.currentContext()));
return;
}
if (v == null) {
Operators.complete(actual);
return;
}
Iterator<? extends R> it;
try {
Iterable<? extends R> iter = mapper.apply(v);
it = iter.iterator();
}
catch (Throwable ex) {
Operators.error(actual, Operators.onOperatorError(ex,
actual.currentContext()));
return;
}
FluxIterable.subscribe(actual, it);
return;
}
source.subscribe(new FluxFlattenIterable.FlattenIterableSubscriber<>(actual,
mapper,
prefetch,
queueSupplier));
}
}