package io.vertx.reactivex;
import com.fasterxml.jackson.core.ObjectCodec;
import com.fasterxml.jackson.core.type.TypeReference;
import io.reactivex.Flowable;
import io.reactivex.FlowableTransformer;
import io.reactivex.plugins.RxJavaPlugins;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.reactivex.impl.FlowableReadStream;
import io.vertx.reactivex.impl.ReadStreamSubscriber;
import io.vertx.reactivex.impl.FlowableUnmarshaller;
import java.util.function.Function;
public class FlowableHelper {
public static <T> ReadStream<T> toReadStream(Flowable<T> observable) {
return ReadStreamSubscriber.asReadStream(observable, Function.identity());
}
public static <T, U> Flowable<U> toFlowable(ReadStream<T> stream, Function<T, U> mapping) {
return RxJavaPlugins.onAssembly(new FlowableReadStream<>(stream, FlowableReadStream.DEFAULT_MAX_BUFFER_SIZE, mapping));
}
public static <T> Flowable<T> toFlowable(ReadStream<T> stream) {
return RxJavaPlugins.onAssembly(new FlowableReadStream<>(stream, FlowableReadStream.DEFAULT_MAX_BUFFER_SIZE, Function.identity()));
}
public static <T> Flowable<T> toFlowable(ReadStream<T> stream, long maxBufferSize) {
return RxJavaPlugins.onAssembly(new FlowableReadStream<>(stream, maxBufferSize, Function.identity()));
}
public static <T> FlowableTransformer<Buffer, T> unmarshaller(Class<T> mappedType) {
return new FlowableUnmarshaller<>(java.util.function.Function.identity(), mappedType);
}
public static <T> FlowableTransformer<Buffer, T>unmarshaller(TypeReference<T> mappedTypeRef) {
return new FlowableUnmarshaller<>(java.util.function.Function.identity(), mappedTypeRef);
}
public static <T> FlowableTransformer<Buffer, T> unmarshaller(Class<T> mappedType, ObjectCodec mapper) {
return new FlowableUnmarshaller<>(java.util.function.Function.identity(), mappedType, mapper);
}
public static <T> FlowableTransformer<Buffer, T>unmarshaller(TypeReference<T> mappedTypeRef, ObjectCodec mapper) {
return new FlowableUnmarshaller<>(java.util.function.Function.identity(), mappedTypeRef, mapper);
}
}