package examples;
import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.SimpleStatement;
import io.vertx.cassandra.*;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpServerResponse;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collector;
import java.util.stream.Collectors;
public class CassandraClientExamples {
public void specifyingNodes(Vertx vertx) {
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.address")
.addContactPoint("node2.address")
.addContactPoint("node3.address");
CassandraClient client = CassandraClient.createNonShared(vertx, options);
}
public void portAndKeyspace(Vertx vertx) {
CassandraClientOptions options = new CassandraClientOptions()
.setPort(9142)
.setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.createNonShared(vertx, options);
}
public void sharedClient(Vertx vertx) {
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.address")
.addContactPoint("node2.address")
.addContactPoint("node3.address")
.setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.createShared(vertx, "sharedClientName", options);
}
public void lowLevelQuerying(CassandraClient cassandraClient) {
cassandraClient.execute("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", execute -> {
if (execute.succeeded()) {
ResultSet resultSet = execute.result();
resultSet.one(one -> {
if (one.succeeded()) {
Row row = one.result();
System.out.println("One row successfully fetched");
} else {
System.out.println("Unable to fetch a row");
one.cause().printStackTrace();
}
});
resultSet.fetchMoreResults(fetchMoreResults -> {
if (fetchMoreResults.succeeded()) {
int availableWithoutFetching = resultSet.getAvailableWithoutFetching();
System.out.println("Now we have " + availableWithoutFetching + " rows fetched, but not consumed!");
if (resultSet.isFullyFetched()) {
System.out.println("The result is fully fetched, we don't need to call this method for one more time!");
} else {
System.out.println("The result still does not fully fetched");
}
} else {
System.out.println("Unable to fetch more results");
fetchMoreResults.cause().printStackTrace();
}
});
} else {
System.out.println("Unable to execute the query");
execute.cause().printStackTrace();
}
});
}
public void executeAndCollect(CassandraClient cassandraClient) {
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
cassandraClient.execute("SELECT * FROM users",
collector,
ar -> {
if (ar.succeeded()) {
String list = ar.result();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
}
public void streamingViaHttp(Vertx vertx, CassandraClient cassandraClient, HttpServerResponse response) {
cassandraClient.queryStream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'", queryStream -> {
if (queryStream.succeeded()) {
CassandraRowStream stream = queryStream.result();
response.drainHandler(v -> stream.resume());
stream.handler(row -> {
String value = row.getString("my_string_col");
response.write(value);
if (response.writeQueueFull()) {
stream.pause();
}
});
stream.endHandler(end -> response.end());
} else {
queryStream.cause().printStackTrace();
response
.setStatusCode(500)
.end("Unable to execute the query");
}
});
}
public void fetchAll(CassandraClient cassandraClient) {
cassandraClient.executeWithFullFetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", executeWithFullFetch -> {
if (executeWithFullFetch.succeeded()) {
List<Row> rows = executeWithFullFetch.result();
for (Row row : rows) {
}
} else {
System.out.println("Unable to execute the query");
executeWithFullFetch.cause().printStackTrace();
}
});
}
public void prepareQuery(CassandraClient cassandraClient) {
cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ", preparedStatementResult -> {
if (preparedStatementResult.succeeded()) {
System.out.println("The query has successfully been prepared");
PreparedStatement preparedStatement = preparedStatementResult.result();
} else {
System.out.println("Unable to prepare the query");
preparedStatementResult.cause().printStackTrace();
}
});
}
public void usingPreparedStatementFuture(CassandraClient cassandraClient, PreparedStatement preparedStatement) {
cassandraClient.execute(preparedStatement.bind("my_value"), done -> {
ResultSet results = done.result();
});
cassandraClient.executeWithFullFetch(preparedStatement.bind("my_value"), done -> {
List<Row> results = done.result();
});
cassandraClient.queryStream(preparedStatement.bind("my_value"), done -> {
CassandraRowStream results = done.result();
});
}
public void batching(CassandraClient cassandraClient) {
BatchStatement batchStatement = new BatchStatement()
.add(new SimpleStatement("INSERT INTO NAMES (name) VALUES ('Pavel')"))
.add(new SimpleStatement("INSERT INTO NAMES (name) VALUES ('Thomas')"))
.add(new SimpleStatement("INSERT INTO NAMES (name) VALUES ('Julien')"));
cassandraClient.execute(batchStatement, result -> {
if (result.succeeded()) {
System.out.println("The given batch executed successfully");
} else {
System.out.println("Unable to execute the batch");
result.cause().printStackTrace();
}
});
}
public class MappedClass {
public MappedClass(String name) {
}
}
public void objectMapper(CassandraClient cassandraClient) {
MappingManager mappingManager = MappingManager.create(cassandraClient);
Mapper<MappedClass> mapper = mappingManager.mapper(MappedClass.class);
MappedClass value = new MappedClass("foo");
mapper.save(value, handler -> {
});
mapper.get(Collections.singletonList("foo"), handler -> {
});
mapper.delete(Collections.singletonList("foo"), handler -> {
});
}
}