package examples;
import io.reactivex.*;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.plugins.RxJavaPlugins;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.docgen.Source;
import io.vertx.reactivex.MaybeHelper;
import io.vertx.reactivex.WriteStreamSubscriber;
import io.vertx.reactivex.core.ObservableHelper;
import io.vertx.reactivex.core.RxHelper;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.WorkerExecutor;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.dns.DnsClient;
import io.vertx.reactivex.core.eventbus.EventBus;
import io.vertx.reactivex.core.eventbus.Message;
import io.vertx.reactivex.core.eventbus.MessageConsumer;
import io.vertx.reactivex.core.file.AsyncFile;
import io.vertx.reactivex.core.file.FileSystem;
import io.vertx.reactivex.core.http.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Source
public class RxifiedExamples {
public void toFlowable(Vertx vertx) {
FileSystem fs = vertx.fileSystem();
fs.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Flowable<Buffer> observable = file.toFlowable();
observable.forEach(data -> System.out.println("Read data: " + data.toString("UTF-8")));
});
}
private static void checkAuth(Handler<AsyncResult<Void>> handler) {
throw new UnsupportedOperationException();
}
public void delayFlowable(HttpServer server) {
server.requestHandler(request -> {
if (request.method() == HttpMethod.POST) {
request.pause();
checkAuth(res -> {
request.resume();
if (res.succeeded()) {
Flowable<Buffer> flowable = request.toFlowable();
flowable.subscribe(buff -> {
});
}
});
}
});
}
public void single(Vertx vertx) {
Single<HttpServer> single = vertx
.createHttpServer()
.rxListen(1234, "localhost");
single.
subscribe(
server -> {
},
failure -> {
}
);
}
public void maybe(Vertx vertx, int dnsPort, String dnsHost, String ipAddress) {
DnsClient client = vertx.createDnsClient(dnsPort, dnsHost);
Maybe<String> maybe = client.rxReverseLookup(ipAddress);
maybe.
subscribe(
name -> {
},
failure -> {
},
() -> {
}
);
}
public void completable(HttpServer server) {
Completable single = server.rxClose();
single.
subscribe(
() -> {
},
failure -> {
}
);
}
public void executeBlockingAdapter(io.vertx.core.Vertx vertx) {
Maybe<String> maybe = MaybeHelper.toMaybe(handler -> {
vertx.executeBlocking(fut -> fut.complete(invokeBlocking()), handler);
});
}
private String invokeBlocking() {
return null;
}
public void scheduler(Vertx vertx) {
Scheduler scheduler = RxHelper.scheduler(vertx);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
}
public void scheduler(WorkerExecutor workerExecutor) {
Scheduler scheduler = RxHelper.blockingScheduler(workerExecutor);
Observable<Long> timer = Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler);
}
public void schedulerHook(Vertx vertx) {
RxJavaPlugins.setComputationSchedulerHandler(s -> RxHelper.scheduler(vertx));
RxJavaPlugins.setIoSchedulerHandler(s -> RxHelper.blockingScheduler(vertx));
RxJavaPlugins.setNewThreadSchedulerHandler(s -> RxHelper.scheduler(vertx));
}
private class MyPojo {
}
public void unmarshaller(FileSystem fileSystem) {
fileSystem.open("/data.txt", new OpenOptions(), result -> {
AsyncFile file = result.result();
Observable<Buffer> observable = file.toObservable();
observable.compose(ObservableHelper.unmarshaller((MyPojo.class))).subscribe(
mypojo -> {
}
);
});
}
public void deployVerticle(Vertx vertx, Verticle verticle) {
Single<String> deployment = RxHelper.deployVerticle(vertx, verticle);
deployment.subscribe(id -> {
}, err -> {
});
}
public void embedded() {
Vertx vertx = io.vertx.reactivex.core.Vertx.vertx();
}
public void verticle() {
class MyVerticle extends io.vertx.reactivex.core.AbstractVerticle {
public void start() {
}
}
}
public void rxStart() {
class MyVerticle extends io.vertx.reactivex.core.AbstractVerticle {
public Completable rxStart() {
return vertx.createHttpServer()
.requestHandler(req -> req.response().end("Hello World"))
.rxListen()
.toCompletable();
}
}
}
public void eventBusMessages(Vertx vertx) {
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<Message<String>> observable = consumer.toObservable();
Disposable sub = observable.subscribe(msg -> {
});
vertx.setTimer(10000, id -> {
sub.dispose();
});
}
public void eventBusBodies(Vertx vertx) {
EventBus eb = vertx.eventBus();
MessageConsumer<String> consumer = eb.<String>consumer("the-address");
Observable<String> observable = consumer.bodyStream().toObservable();
}
public void eventBusMapReduce(Vertx vertx) {
Observable<Double> observable = vertx.eventBus().
<Double>consumer("heat-sensor").
bodyStream().
toObservable();
observable.
buffer(1, TimeUnit.SECONDS).
map(samples -> samples.
stream().
collect(Collectors.averagingDouble(d -> d))).
subscribe(heat -> {
vertx.eventBus().send("news-feed", "Current heat is " + heat);
});
}
public void websocketServer(HttpServer server) {
Observable<ServerWebSocket> socketObservable = server.webSocketStream().toObservable();
socketObservable.subscribe(
socket -> System.out.println("Web socket connect"),
failure -> System.out.println("Should never be called"),
() -> {
System.out.println("Subscription ended or server closed");
}
);
}
public void websocketServerBuffer(Flowable<ServerWebSocket> socketObservable) {
socketObservable.subscribe(
socket -> {
Observable<Buffer> dataObs = socket.toObservable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);
}
public void websocketClient(Vertx vertx) {
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
client.rxWebSocket(8080, "localhost", "/the_uri").subscribe(
ws -> {
},
error -> {
}
);
}
public void websocketClientBuffer(Flowable<WebSocket> socketObservable) {
socketObservable.subscribe(
socket -> {
Flowable<Buffer> dataObs = socket.toFlowable();
dataObs.subscribe(buffer -> {
System.out.println("Got message " + buffer.toString("UTF-8"));
});
}
);
}
public void httpClientRequest(Vertx vertx) {
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
Single<HttpClientResponse> request = client
.rxRequest( HttpMethod.GET, 8080, "localhost", "/the_uri")
.flatMap(HttpClientRequest::rxSend);
request.subscribe(
response -> {
},
error -> {
}
);
}
public void httpClientResponse(HttpClient client) {
Single<HttpClientResponse> request = client
.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
.flatMap(HttpClientRequest::rxSend);
request.subscribe(
response -> {
Observable<Buffer> observable = response.toObservable();
observable.forEach(
buffer -> {
}
);
}
);
}
public void httpClientResponseFlatMap(HttpClient client) {
Single<HttpClientResponse> request = client
.rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri")
.flatMap(HttpClientRequest::rxSend);
request.
flatMapObservable(HttpClientResponse::toObservable).
forEach(
buffer -> {
}
);
}
public void httpServerRequest(HttpServer server) {
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
});
}
public void httpServerRequestObservable(HttpServer server) {
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
Observable<Buffer> observable = request.toObservable();
});
}
public void httpServerRequestObservableUnmarshall(HttpServer server) {
Observable<HttpServerRequest> requestObservable = server.requestStream().toObservable();
requestObservable.subscribe(request -> {
Observable<MyPojo> observable = request.
toObservable().
compose(io.vertx.reactivex.core.ObservableHelper.unmarshaller(MyPojo.class));
});
}
public void timer(Vertx vertx) {
vertx.timerStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback after 1 second");
}
);
}
public void periodic(Vertx vertx) {
vertx.periodicStream(1000).
toObservable().
subscribe(
id -> {
System.out.println("Callback every second");
}
);
}
public void periodicUnsubscribe(Vertx vertx) {
vertx.periodicStream(1000).
toObservable().
subscribe(new Observer<Long>() {
private Disposable sub;
public void onSubscribe(@NonNull Disposable d) {
sub = d;
}
public void onNext(Long aLong) {
sub.dispose();
}
public void onError(Throwable e) {}
public void onComplete() {}
});
}
public void writeStreamSubscriberAdapter(Flowable<io.vertx.core.buffer.Buffer> flowable, io.vertx.core.http.HttpServerResponse response) {
response.setChunked(true);
WriteStreamSubscriber<io.vertx.core.buffer.Buffer> subscriber = io.vertx.reactivex.RxHelper.toSubscriber(response);
flowable.subscribe(subscriber);
}
public void rxWriteStreamSubscriberAdapter(Flowable<Buffer> flowable, HttpServerResponse response) {
response.setChunked(true);
flowable.subscribe(response.toSubscriber());
}
public void writeStreamSubscriberAdapterCallbacks(Flowable<Buffer> flowable, HttpServerResponse response) {
response.setChunked(true);
WriteStreamSubscriber<Buffer> subscriber = response.toSubscriber();
subscriber.onError(throwable -> {
if (!response.headWritten() && response.closed()) {
response.setStatusCode(500).end("oops");
} else {
}
});
subscriber.onWriteStreamError(throwable -> {
});
subscriber.onWriteStreamEnd(() -> {
});
flowable.subscribe(subscriber);
}
}