package examples;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.AsyncFile;
import io.vertx.core.file.FileSystem;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.Pump;
import io.vertx.core.streams.ReadStream;
public class StreamsExamples {
public void pipe1(Vertx vertx) {
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
sock.write(buffer);
});
}).listen();
}
public void pipe2(Vertx vertx) {
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
if (!sock.writeQueueFull()) {
sock.write(buffer);
}
});
}).listen();
}
public void pipe3(Vertx vertx) {
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
sock.write(buffer);
if (sock.writeQueueFull()) {
sock.pause();
}
});
}).listen();
}
public void pipe4(Vertx vertx) {
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.handler(buffer -> {
sock.write(buffer);
if (sock.writeQueueFull()) {
sock.pause();
sock.drainHandler(done -> {
sock.resume();
});
}
});
}).listen();
}
public void pipe5(Vertx vertx) {
NetServer server = vertx.createNetServer(
new NetServerOptions().setPort(1234).setHost("localhost")
);
server.connectHandler(sock -> {
sock.pipeTo(sock);
}).listen();
}
public void pipe6(NetServer server) {
server.connectHandler(sock -> {
sock.pipeTo(sock, ar -> {
if (ar.succeeded()) {
System.out.println("Pipe succeeded");
} else {
System.out.println("Pipe failed");
}
});
}).listen();
}
public void pipe7(NetServer server, FileSystem fs) {
server.connectHandler(sock -> {
Pipe<Buffer> pipe = sock.pipe();
fs.open("/path/to/file", new OpenOptions(), ar -> {
if (ar.succeeded()) {
AsyncFile file = ar.result();
pipe.to(file);
} else {
sock.close();
}
});
}).listen();
}
public void pipe8(Vertx vertx, FileSystem fs) {
vertx.createHttpServer()
.requestHandler(request -> {
Pipe<Buffer> pipe = request.pipe();
fs.open("/path/to/file", new OpenOptions(), ar -> {
if (ar.succeeded()) {
AsyncFile file = ar.result();
pipe.to(file);
} else {
pipe.close();
request.response().setStatusCode(500).end();
}
});
}).listen(8080);
}
public void pipe9(AsyncFile src, AsyncFile dst) {
src.pipe()
.endOnSuccess(false)
.to(dst, rs -> {
dst.end(Buffer.buffer("done"));
});
}
}