package examples;
import io.vertx.cassandra.CassandraClientOptions;
import io.vertx.reactivex.cassandra.CassandraClient;
import io.vertx.reactivex.cassandra.CassandraRowStream;
import io.vertx.reactivex.core.Vertx;
public class RxCassandraClientExamples {
public void createClient(Vertx vertx) {
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.corp.int", 7000)
.addContactPoint("node2.corp.int", 7000)
.addContactPoint("node3.corp.int", 7000);
CassandraClient cassandraClient = CassandraClient.createShared(vertx, options);
}
public void simpleQueryStream(CassandraClient cassandraClient) {
cassandraClient.rxQueryStream("SELECT my_key FROM my_keyspace.my_table where my_key = my_value")
// Convert the stream to a Flowable
.flatMapPublisher(CassandraRowStream::toFlowable)
.subscribe(row -> {
// Handle single row
}, t -> {
// Handle failure
}, () -> {
// End of stream
});
}
public void simpleFullFetch(CassandraClient cassandraClient) {
cassandraClient.rxExecuteWithFullFetch("SELECT my_key FROM my_keyspace.my_table where my_key = my_value")
.subscribe(rows -> {
// Handle list of rows
}, throwable -> {
// Handle failure
});
}
}