package reactor.core.publisher;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import static reactor.core.publisher.FluxTimeout.addNameToTimeoutDescription;
final class MonoTimeout<T, U, V> extends MonoOperator<T, T> {
final Publisher<U> firstTimeout;
final Publisher<? extends T> other;
final String timeoutDescription;
@SuppressWarnings("rawtypes")
final static Function NEVER = e -> Flux.never();
MonoTimeout(Mono<? extends T> source,
Publisher<U> firstTimeout,
String timeoutDescription) {
super(source);
this.firstTimeout = Objects.requireNonNull(firstTimeout, "firstTimeout");
this.other = null;
this.timeoutDescription = timeoutDescription;
}
MonoTimeout(Mono<? extends T> source,
Publisher<U> firstTimeout,
Publisher<? extends T> other) {
super(source);
this.firstTimeout = Objects.requireNonNull(firstTimeout, "firstTimeout");
this.other = Objects.requireNonNull(other, "other");
this.timeoutDescription = null;
}
@Override
@SuppressWarnings("unchecked")
public void subscribe(CoreSubscriber<? super T> actual) {
CoreSubscriber<T> serial = Operators.serialize(actual);
FluxTimeout.TimeoutMainSubscriber<T, V> main =
new FluxTimeout.TimeoutMainSubscriber<>(serial, NEVER, other,
addNameToTimeoutDescription(source, timeoutDescription));
serial.onSubscribe(main);
FluxTimeout.TimeoutTimeoutSubscriber ts =
new FluxTimeout.TimeoutTimeoutSubscriber(main, 0L);
main.setTimeout(ts);
firstTimeout.subscribe(ts);
source.subscribe(main);
}
}