package reactor.core.publisher;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import static reactor.core.publisher.FluxTimeout.addNameToTimeoutDescription;
final class MonoTimeout<T, U, V> extends InternalMonoOperator<T, T> {
final Publisher<U> firstTimeout;
final Publisher<? extends T> other;
final String timeoutDescription;
@SuppressWarnings("rawtypes")
final static Function NEVER = e -> Flux.never();
MonoTimeout(Mono<? extends T> source,
Publisher<U> firstTimeout,
String timeoutDescription) {
super(source);
this.firstTimeout = Objects.requireNonNull(firstTimeout, "firstTimeout");
this.other = null;
this.timeoutDescription = timeoutDescription;
}
MonoTimeout(Mono<? extends T> source,
Publisher<U> firstTimeout,
Publisher<? extends T> other) {
super(source);
this.firstTimeout = Objects.requireNonNull(firstTimeout, "firstTimeout");
this.other = Objects.requireNonNull(other, "other");
this.timeoutDescription = null;
}
@Override
@SuppressWarnings("unchecked")
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
return new FluxTimeout.TimeoutMainSubscriber<T, T>(
Operators.serialize(actual),
firstTimeout,
NEVER,
other,
addNameToTimeoutDescription(source, timeoutDescription)
);
}
@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;
return super.scanUnsafe(key);
}
}