package reactor.core.publisher;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.function.Function;
import java.util.function.Supplier;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
final class MonoFlattenIterable<T, R> extends FluxFromMonoOperator<T, R>
implements Fuseable {
final Function<? super T, ? extends Iterable<? extends R>> mapper;
final int prefetch;
final Supplier<Queue<T>> queueSupplier;
MonoFlattenIterable(Mono<? extends T> source,
Function<? super T, ? extends Iterable<? extends R>> mapper,
int prefetch,
Supplier<Queue<T>> queueSupplier) {
super(source);
if (prefetch <= 0) {
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
}
this.mapper = Objects.requireNonNull(mapper, "mapper");
this.prefetch = prefetch;
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier");
}
@Override
public int getPrefetch() {
return prefetch;
}
@SuppressWarnings("unchecked")
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super R> actual) throws Exception {
if (source instanceof Callable) {
T v = ((Callable<T>) source).call();
if (v == null) {
Operators.complete(actual);
return null;
}
Iterable<? extends R> iter = mapper.apply(v);
Iterator<? extends R> it = iter.iterator();
boolean itFinite = FluxIterable.checkFinite(iter);
FluxIterable.subscribe(actual, it, itFinite);
return null;
}
return new FluxFlattenIterable.FlattenIterableSubscriber<>(actual,
mapper,
prefetch,
queueSupplier);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
}