package examples;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.docgen.Source;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.OffsetAndTimestamp;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@Source
public class VertxKafkaClientExamples {
public void exampleCreateConsumerJava(Vertx vertx) {
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
}
public void exampleCreateConsumer(Vertx vertx) {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, config);
}
public void createProducer(Vertx vertx) {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("acks", "1");
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
}
public void createProducerJava(Vertx vertx) {
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config, String.class, String.class);
}
public void exampleSubscribe(KafkaConsumer<String, String> consumer) {
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer.subscribe(topics);
Pattern pattern = Pattern.compile("topic\\d");
consumer.subscribe(pattern);
consumer.subscribe("a-single-topic");
}
public void exampleSubscribeWithResult(KafkaConsumer<String, String> consumer) {
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
Set<String> topics = new HashSet<>();
topics.add("topic1");
topics.add("topic2");
topics.add("topic3");
consumer
.subscribe(topics)
.onSuccess(v ->
System.out.println("subscribed")
).onFailure(cause ->
System.out.println("Could not subscribe " + cause.getMessage())
);
consumer
.subscribe("a-single-topic")
.onSuccess(v ->
System.out.println("subscribed")
).onFailure(cause ->
System.out.println("Could not subscribe " + cause.getMessage())
);
}
public void exampleConsumerPartitionsNotifs(KafkaConsumer<String, String> consumer) {
consumer.handler(record -> {
System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
consumer.partitionsAssignedHandler(topicPartitions -> {
System.out.println("Partitions assigned");
for (TopicPartition topicPartition : topicPartitions) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
});
consumer.partitionsRevokedHandler(topicPartitions -> {
System.out.println("Partitions revoked");
for (TopicPartition topicPartition : topicPartitions) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
});
consumer
.subscribe("test")
.onSuccess(v ->
System.out.println("subscribed")
).onFailure(cause ->
System.out.println("Could not subscribe " + cause.getMessage())
);
}
public void exampleUnsubscribe(KafkaConsumer<String, String> consumer) {
consumer.unsubscribe();
}
public void exampleUnsubscribeWithCallback(KafkaConsumer<String, String> consumer) {
consumer
.unsubscribe()
.onSuccess(v ->
System.out.println("Consumer unsubscribed")
);
}
public void exampleConsumerAssignPartition(KafkaConsumer<String, String> consumer) {
consumer.handler(record -> {
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
});
Set<TopicPartition> topicPartitions = new HashSet<>();
topicPartitions.add(new TopicPartition()
.setTopic("test")
.setPartition(0));
consumer
.assign(topicPartitions)
.onSuccess(v -> System.out.println("Partition assigned"))
.compose(v -> consumer.assignment())
.onSuccess(partitions -> {
for (TopicPartition topicPartition : partitions) {
System.out.println(topicPartition.getTopic() + " " + topicPartition.getPartition());
}
});
}
public void exampleConsumerPartitionsFor(KafkaConsumer<String, String> consumer) {
consumer
.partitionsFor("test")
.onSuccess(partitions -> {
for (PartitionInfo partitionInfo : partitions) {
System.out.println(partitionInfo);
}
});
}
public void exampleConsumerListTopics(KafkaConsumer<String, String> consumer) {
consumer
.listTopics()
.onSuccess(partitionsTopicMap ->
partitionsTopicMap.forEach((topic, partitions) -> {
System.out.println("topic = " + topic);
System.out.println("partitions = " + partitions);
})
);
}
public void exampleConsumerWithPoll(Vertx vertx, KafkaConsumer<String, String> consumer) {
consumer
.subscribe("test")
.onSuccess(v -> {
System.out.println("Consumer subscribed");
vertx.setPeriodic(1000, timerId ->
consumer
.poll(Duration.ofMillis(100))
.onSuccess(records -> {
for (int i = 0; i < records.size(); i++) {
KafkaConsumerRecord<String, String> record = records.recordAt(i);
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
}
})
.onFailure(cause -> {
System.out.println("Something went wrong when polling " + cause.toString());
cause.printStackTrace();
vertx.cancelTimer(timerId);
})
);
});
}
public void exampleProducerPartitionsFor(KafkaProducer<String, String> producer) {
producer
.partitionsFor("test")
.onSuccess(partitions ->
partitions.forEach(System.out::println)
);
}
public void exampleConsumerManualOffsetCommit(KafkaConsumer<String, String> consumer) {
consumer.commit().onSuccess(v ->
System.out.println("Last read message offset committed")
);
}
public void exampleSeek(KafkaConsumer<String, String> consumer) {
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
consumer
.seek(topicPartition, 10)
.onSuccess(v -> System.out.println("Seeking done"));
}
public void exampleSeekToBeginning(KafkaConsumer<String, String> consumer) {
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
consumer
.seekToBeginning(Collections.singleton(topicPartition))
.onSuccess(v -> System.out.println("Seeking done"));
}
public void exampleSeekToEnd(KafkaConsumer<String, String> consumer) {
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
consumer
.seekToEnd(Collections.singleton(topicPartition))
.onSuccess(v -> System.out.println("Seeking done"));
}
public void exampleConsumerBeginningOffsets(KafkaConsumer<String, String> consumer) {
Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);
consumer
.beginningOffsets(topicPartitions)
.onSuccess(results ->
results.forEach((topic, beginningOffset) ->
System.out.println(
"Beginning offset for topic=" + topic.getTopic() + ", partition=" +
topic.getPartition() + ", beginningOffset=" + beginningOffset
)
)
);
consumer
.beginningOffsets(topicPartition)
.onSuccess(beginningOffset ->
System.out.println(
"Beginning offset for topic=" + topicPartition.getTopic() + ", partition=" +
topicPartition.getPartition() + ", beginningOffset=" + beginningOffset
)
);
}
public void exampleConsumerEndOffsets(KafkaConsumer<String, String> consumer) {
Set<TopicPartition> topicPartitions = new HashSet<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
topicPartitions.add(topicPartition);
consumer.endOffsets(topicPartitions)
.onSuccess(results ->
results.forEach((topic, beginningOffset) ->
System.out.println(
"End offset for topic=" + topic.getTopic() + ", partition=" +
topic.getPartition() + ", beginningOffset=" + beginningOffset
)
)
);
consumer
.endOffsets(topicPartition)
.onSuccess(endOffset ->
System.out.println(
"End offset for topic=" + topicPartition.getTopic() + ", partition=" +
topicPartition.getPartition() + ", endOffset=" + endOffset
)
);
}
public void exampleConsumerOffsetsForTimes(KafkaConsumer<String, String> consumer) {
Map<TopicPartition, Long> topicPartitionsWithTimestamps = new HashMap<>();
TopicPartition topicPartition = new TopicPartition().setTopic("test").setPartition(0);
long timestamp = (System.currentTimeMillis() - 60000);
topicPartitionsWithTimestamps.put(topicPartition, timestamp);
consumer
.offsetsForTimes(topicPartitionsWithTimestamps)
.onSuccess(results ->
results.forEach((topic, offset) ->
System.out.println(
"Offset for topic=" + topic.getTopic() +
", partition=" + topic.getPartition() + "\n" +
", timestamp=" + timestamp + ", offset=" + offset.getOffset() +
", offsetTimestamp=" + offset.getTimestamp()
)
)
);
consumer.offsetsForTimes(topicPartition, timestamp).onSuccess(offsetAndTimestamp ->
System.out.println(
"Offset for topic=" + topicPartition.getTopic() +
", partition=" + topicPartition.getPartition() + "\n" +
", timestamp=" + timestamp + ", offset=" + offsetAndTimestamp.getOffset() +
", offsetTimestamp=" + offsetAndTimestamp.getTimestamp()
)
);
}
public void exampleConsumerFlowControl(Vertx vertx, KafkaConsumer<String, String> consumer) {
TopicPartition topicPartition = new TopicPartition()
.setTopic("test")
.setPartition(0);
consumer.handler(record -> {
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
if ((record.partition() == 0) && (record.offset() == 5)) {
consumer.pause(topicPartition)
.onSuccess(v -> System.out.println("Paused"))
.onSuccess(v -> vertx.setTimer(5000, timeId ->
consumer.resume(topicPartition)
));
}
});
}
public void exampleConsumerClose(KafkaConsumer<String, String> consumer) {
consumer
.close()
.onSuccess(v -> System.out.println("Consumer is now closed"))
.onFailure(cause -> System.out.println("Close failed: " + cause));
}
public void exampleProducerWrite(KafkaProducer<String, String> producer) {
for (int i = 0; i < 5; i++) {
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", "message_" + i);
producer.write(record);
}
}
public void exampleProducerWriteWithAck(KafkaProducer<String, String> producer) {
for (int i = 0; i < 5; i++) {
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", "message_" + i);
producer.send(record).onSuccess(recordMetadata ->
System.out.println(
"Message " + record.value() + " written on topic=" + recordMetadata.getTopic() +
", partition=" + recordMetadata.getPartition() +
", offset=" + recordMetadata.getOffset()
)
);
}
}
public void exampleProducerWriteWithSpecificPartition(KafkaProducer<String, String> producer) {
for (int i = 0; i < 10; i++) {
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", null, "message_" + i, 0);
producer.write(record);
}
}
public void exampleProducerWriteWithSpecificKey(KafkaProducer<String, String> producer) {
for (int i = 0; i < 10; i++) {
int key = i % 2;
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("test", String.valueOf(key), "message_" + i);
producer.write(record);
}
}
public void exampleSharedProducer(Vertx vertx, Map<String, String> config) {
KafkaProducer<String, String> producer1 = KafkaProducer.createShared(vertx, "the-producer", config);
producer1.close();
}
public void exampleProducerClose(KafkaProducer<String, String> producer) {
producer
.close()
.onSuccess(v -> System.out.println("Producer is now closed"))
.onFailure(cause -> System.out.println("Close failed: " + cause));
}
public void exampleErrorHandling(KafkaConsumer<String, String> consumer) {
consumer.exceptionHandler(e -> {
System.out.println("Error = " + e.getMessage());
});
}
public void exampleUsingVertxDeserializers() {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.BufferDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonObjectDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("value.deserializer", "io.vertx.kafka.client.serialization.JsonArrayDeserializer");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
}
public void exampleUsingVertxSerializers() {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.BufferSerializer");
config.put("acks", "1");
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonObjectSerializer");
config.put("acks", "1");
config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("key.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("value.serializer", "io.vertx.kafka.client.serialization.JsonArraySerializer");
config.put("acks", "1");
}
public void exampleUsingVertxDeserializers2(Vertx vertx) {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("group.id", "my_group");
config.put("auto.offset.reset", "earliest");
config.put("enable.auto.commit", "false");
KafkaConsumer<Buffer, Buffer> bufferConsumer = KafkaConsumer.create(vertx, config, Buffer.class, Buffer.class);
KafkaConsumer<JsonObject, JsonObject> jsonObjectConsumer = KafkaConsumer.create(vertx, config, JsonObject.class, JsonObject.class);
KafkaConsumer<JsonArray, JsonArray> jsonArrayConsumer = KafkaConsumer.create(vertx, config, JsonArray.class, JsonArray.class);
}
public void exampleUsingVertxSerializers2(Vertx vertx) {
Map<String, String> config = new HashMap<>();
config.put("bootstrap.servers", "localhost:9092");
config.put("acks", "1");
KafkaProducer<Buffer, Buffer> bufferProducer = KafkaProducer.create(vertx, config, Buffer.class, Buffer.class);
KafkaProducer<JsonObject, JsonObject> jsonObjectProducer = KafkaProducer.create(vertx, config, JsonObject.class, JsonObject.class);
KafkaProducer<JsonArray, JsonArray> jsonArrayProducer = KafkaProducer.create(vertx, config, JsonArray.class, JsonArray.class);
}
}