package reactor.core.publisher;
import java.util.Iterator;
import java.util.Objects;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.function.Supplier;
import java.util.stream.Stream;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
final class FluxStream<T> extends Flux<T> implements Fuseable, SourceProducer<T> {
final Supplier<? extends Stream<? extends T>> streamSupplier;
FluxStream(Supplier<? extends Stream<? extends T>> streamSupplier) {
this.streamSupplier = Objects.requireNonNull(streamSupplier, "streamSupplier");
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Stream<? extends T> stream;
try {
stream = Objects.requireNonNull(streamSupplier.get(),
"The stream supplier returned a null Stream");
}
catch (Throwable e) {
Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
return;
}
Iterator<? extends T> it;
boolean knownToBeFinite;
try {
Spliterator<? extends T> spliterator = Objects.requireNonNull(stream.spliterator(),
"The stream returned a null Spliterator");
knownToBeFinite = spliterator.hasCharacteristics(Spliterator.SIZED);
it = Spliterators.iterator(spliterator);
}
catch (Throwable e) {
Operators.error(actual, Operators.onOperatorError(e, actual.currentContext()));
return;
}
FluxIterable.subscribe(actual, it, knownToBeFinite, stream::close);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return null;
}
}