package examples;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.docgen.Source;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.OffsetAndTimestamp;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@Source
public class VertxKafkaClientExamples {
public void exampleCreateConsumerJava(Vertx vertx) {
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
}
public void exampleCreateConsumer(Vertx vertx) {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
}
public void createProducer(Vertx vertx) {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("acks", "1");
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
}
public void createProducerJava(Vertx vertx) {
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config, String.class, String.class);
}
public void exampleSubscribe(KafkaConsumer<String, String> consumer) {
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer.subscribe(topics);
Pattern pattern = Pattern.compile("topic\\d");
consumer.subscribe(pattern);
consumer.subscribe("a-single-topic");
}
public void exampleSubscribeWithResult(KafkaConsumer<String, String> consumer) {
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer.subscribe(topics, ar -> {
if (ar.succeeded()) {
System.out.println("subscribed");
} else {
System.out.println("Could not subscribe " + ar.cause().getMessage());
}
});
consumer.subscribe("a-single-topic", ar -> {
if (ar.succeeded()) {
System.out.println("subscribed");
} else {
System.out.println("Could not subscribe " + ar.cause().getMessage());
}
});
}
public void exampleConsumerPartitionsNotifs(KafkaConsumer<String, String> consumer) {
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
consumer.partitionsAssignedHandler(topicPartitions -> {
System.out.println("Partitions assigned");
for (TopicPartition topicPartition : topicPartitions) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
});
consumer.partitionsRevokedHandler(topicPartitions -> {
System.out.println("Partitions revoked");
for (TopicPartition topicPartition : topicPartitions) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
});
consumer.subscribe("test", ar -> {
if (ar.succeeded()) {
System.out.println("Consumer subscribed");
}
});
}
public void exampleUnsubscribe(KafkaConsumer<String, String> consumer) {
consumer.unsubscribe();
}
public void exampleUnsubscribeWithCallback(KafkaConsumer<String, String> consumer) {
consumer.unsubscribe(ar -> {
if (ar.succeeded()) {
System.out.println("Consumer unsubscribed");
}
});
}
public void exampleConsumerAssignPartition(KafkaConsumer<String, String> consumer) {
consumer.handler(record -> {
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(new TopicPartition()
.setTopic("test")
.setPartition(0));
consumer.assign(topicPartitions, done -> {
if (done.succeeded()) {
System.out.println("Partition assigned");
consumer.assignment(done1 -> {
if (done1.succeeded()) {
for (TopicPartition topicPartition : done1.result()) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
}
});
}
});
}
public void exampleConsumerPartitionsFor(KafkaConsumer<String, String> consumer) {
consumer.partitionsFor("test", ar -> {
if (ar.succeeded()) {
for (PartitionInfo partitionInfo : ar.result()) {
System.out.println(partitionInfo);
}
}
});
}
public void exampleConsumerListTopics(KafkaConsumer<String, String> consumer) {
consumer.listTopics(ar -> {
if (ar.succeeded()) {
Map<String, List<PartitionInfo>> map = ar.result();
map.forEach((topic, partitions) -> {
System.out.println("topic = " + topic);
System.out.println("partitions = " + map.get(topic));
});
}
});
}
public void exampleConsumerWithPoll(Vertx vertx, KafkaConsumer<String, String> consumer) {
consumer.subscribe("test", ar -> {
if (ar.succeeded()) {
System.out.println("Consumer subscribed");
vertx.setPeriodic(1000, timerId -> {
consumer.poll(100, ar1 -> {
if (ar1.succeeded()) {
KafkaConsumerRecords<String, String> records = ar1.result();
for (int i = 0; i < records.size(); i++) {
KafkaConsumerRecord<String, String> record = records.recordAt(i);
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
}
}
});
});
}
});
}
public void exampleProducerPartitionsFor(KafkaProducer<String, String> producer) {
producer.partitionsFor("test", ar -> {
if (ar.succeeded()) {
for (PartitionInfo partitionInfo : ar.result()) {
System.out.println(partitionInfo);
}
}
});
}
public void exampleConsumerManualOffsetCommit(KafkaConsumer<String, String> consumer) {
consumer.commit(ar -> {
if (ar.succeeded()) {
System.out.println("Last read message offset committed");
}
});
}
public void exampleSeek(KafkaConsumer<String, String> consumer) {
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
consumer.seek(topicPartition, 10, done -> {
if (done.succeeded()) {
System.out.println("Seeking done");
}
});
}
public void exampleSeekToBeginning(KafkaConsumer<String, String> consumer) {
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
consumer.seekToBeginning(Collections.singleton(topicPartition), done -> {
if (done.succeeded()) {
System.out.println("Seeking done");
}
});
}
public void exampleSeekToEnd(KafkaConsumer<String, String> consumer) {
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
consumer.seekToEnd(Collections.singleton(topicPartition), done -> {
if (done.succeeded()) {
System.out.println("Seeking done");
}
});
}
public void exampleConsumerBeginningOffsets(KafkaConsumer<String, String> consumer) {
Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);
consumer.beginningOffsets(topicPartitions, done -> {
if(done.succeeded()) {
Map<TopicPartition, Long> results = done.result();
results.forEach((topic, beginningOffset) ->
System.out.println("Beginning offset for topic="+topic.getTopic()+", partition="+
topic.getPartition()+", beginningOffset="+beginningOffset));
}
});
consumer.beginningOffsets(topicPartition, done -> {
if(done.succeeded()) {
Long beginningOffset = done.result();
System.out.println("Beginning offset for topic="+topicPartition.getTopic()+", partition="+
topicPartition.getPartition()+", beginningOffset="+beginningOffset);
}
});
}
public void exampleConsumerEndOffsets(KafkaConsumer<String, String> consumer) {
Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);
consumer.endOffsets(topicPartitions, done -> {
if(done.succeeded()) {
Map<TopicPartition, Long> results = done.result();
results.forEach((topic, endOffset) ->
System.out.println("End offset for topic="+topic.getTopic()+", partition="+
topic.getPartition()+", endOffset="+endOffset));
}
});
consumer.endOffsets(topicPartition, done -> {
if(done.succeeded()) {
Long endOffset = done.result();
System.out.println("End offset for topic="+topicPartition.getTopic()+", partition="+
topicPartition.getPartition()+", endOffset="+endOffset);
}
});
}
public void exampleConsumerOffsetsForTimes(KafkaConsumer<String, String> consumer) {
Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
long timestamp = (System.currentTimeMillis() - 60000);
topicPartitionsWithTimestamps.put(topicPartition, timestamp);
consumer.offsetsForTimes(topicPartitionsWithTimestamps, done -> {
if(done.succeeded()) {
Map<TopicPartition, OffsetAndTimestamp> results = done.result();
results.forEach((topic, offset) ->
System.out.println("Offset for topic="+topic.getTopic()+
", partition="+topic.getPartition()+"\n"+
", timestamp="+timestamp+", offset="+offset.getOffset()+
", offsetTimestamp="+offset.getTimestamp()));
}
});
consumer.offsetsForTimes(topicPartition, timestamp, done -> {
if(done.succeeded()) {
OffsetAndTimestamp offsetAndTimestamp = done.result();
System.out.println("Offset for topic="+topicPartition.getTopic()+
", partition="+topicPartition.getPartition()+"\n"+
", timestamp="+timestamp+", offset="+offsetAndTimestamp.getOffset()+
", offsetTimestamp="+offsetAndTimestamp.getTimestamp());
}
});
}
public void exampleConsumerFlowControl(Vertx vertx, KafkaConsumer<String, String> consumer) {
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
consumer.handler(record -> {
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
if ((record.partition() == 0) && (record.offset() == 5)) {
consumer.pause(topicPartition, ar -> {
if (ar.succeeded()) {
System.out.println("Paused");
vertx.setTimer(5000, timeId -> {
consumer.resume(topicPartition);
});
}
});
}
});
}
public void exampleConsumerClose(KafkaConsumer<String, String> consumer) {
consumer.close(res -> {
if (res.succeeded()) {
System.out.println("Consumer is now closed");
} else {
System.out.println("close failed");
}
});
}
public void exampleProducerWrite(KafkaProducer<String, String> producer) {
for (int i = 0; i < 5; i++) {
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", "message_" + i);
producer.write(record);
}
}
public void exampleProducerWriteWithAck(KafkaProducer<String, String> producer) {
for (int i = 0; i < 5; i++) {
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", "message_" + i);
producer.send(record, done -> {
if (done.succeeded()) {
RecordMetadata recordMetadata = done.result();
System.out.println("Message " + record.value() + " written on topic=" + recordMetadata.getTopic() +
", partition=" + recordMetadata.getPartition() +
", offset=" + recordMetadata.getOffset());
}
});
}
}
public void exampleProducerWriteWithSpecificPartition(KafkaProducer<String, String> producer) {
for (int i = 0; i < 10; i++) {
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", null, "message_" + i, 0);
producer.write(record);
}
}
public void exampleProducerWriteWithSpecificKey(KafkaProducer<String, String> producer) {
for (int i = 0; i < 10; i++) {
int key = i % 2;
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);
producer.write(record);
}
}
public void exampleSharedProducer(Vertx vertx, Map<String, String> config) {
KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);
producer1.close();
}
public void exampleProducerClose(KafkaProducer<String, String> producer) {
producer.close(res -> {
if (res.succeeded()) {
System.out.println("Producer is now closed");
} else {
System.out.println("close failed");
}
});
}
public void exampleErrorHandling(KafkaConsumer<String, String> consumer) {
consumer.exceptionHandler(e -> {
System.out.println("Error = " + e.getMessage());
});
}
public void exampleUsingVertxDeserializers() {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
}
public void exampleUsingVertxSerializers() {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("acks", "1");
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("acks", "1");
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("acks", "1");
}
public void exampleUsingVertxDeserializers2(Vertx vertx) {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);
KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);
KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);
}
public void exampleUsingVertxSerializers2(Vertx vertx) {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("acks", "1");
KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);
KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);
KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);
}
}