package examples;
import io.vertx.core.Vertx;
import io.vertx.docgen.Source;
import io.vertx.kafka.admin.Config;
import io.vertx.kafka.admin.ConfigEntry;
import io.vertx.kafka.admin.ConsumerGroupDescription;
import io.vertx.kafka.admin.KafkaAdminClient;
import io.vertx.kafka.admin.MemberDescription;
import io.vertx.kafka.admin.NewTopic;
import io.vertx.kafka.admin.TopicDescription;
import io.vertx.kafka.client.common.ConfigResource;
import io.vertx.kafka.client.common.TopicPartitionInfo;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.config.TopicConfig;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@Source
public class KafkaAdminClientExamples {
public void exampleCreateAdminClient(Vertx vertx) {
Properties config = new Properties();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
KafkaAdminClient adminClient = KafkaAdminClient.create(vertx, config);
}
public void exampleListTopics(KafkaAdminClient adminClient) {
adminClient.listTopics(ar -> {
System.out.println("Topics= " + ar.result());
});
}
public void exampleDescribeTopics(KafkaAdminClient adminClient) {
adminClient.describeTopics(Collections.singletonList("my-topic"), ar -> {
TopicDescription topicDescription = ar.result().get("first-topic");
System.out.println("Topic name=" + topicDescription.getName() +
" isInternal= " + topicDescription.isInternal() +
" partitions= " + topicDescription.getPartitions().size());
for (TopicPartitionInfo topicPartitionInfo : topicDescription.getPartitions()) {
System.out.println("Partition id= " + topicPartitionInfo.getPartition() +
" leaderId= " + topicPartitionInfo.getLeader().getId() +
" replicas= " + topicPartitionInfo.getReplicas() +
" isr= " + topicPartitionInfo.getIsr());
}
});
}
public void exampleDeleteTopics(KafkaAdminClient adminClient) {
adminClient.deleteTopics(Collections.singletonList("topicToDelete"), ar -> {
});
}
public void exampleCreateTopics(KafkaAdminClient adminClient) {
adminClient.createTopics(Collections.singletonList(new NewTopic("testCreateTopic", 1, (short)1)), ar -> {
});
}
public void exampleDescribeConfigs(KafkaAdminClient adminClient) {
adminClient.describeConfigs(Collections.singletonList(
new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic")), ar -> {
});
}
public void exampleAlterConfigs(KafkaAdminClient adminClient) {
ConfigResource resource = new ConfigResource(org.apache.kafka.common.config.ConfigResource.Type.TOPIC, "my-topic");
ConfigEntry retentionEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, "51000");
Map<ConfigResource, Config> updateConfig = new HashMap<>();
updateConfig.put(resource, new Config(Collections.singletonList(retentionEntry)));
adminClient.alterConfigs(updateConfig, ar -> {
});
}
public void exampleListConsumerGroups(KafkaAdminClient adminClient) {
adminClient.listConsumerGroups(ar -> {
System.out.println("ConsumerGroups= " + ar.result());
});
}
public void exampleDescribeConsumerGroups(KafkaAdminClient adminClient) {
adminClient.describeConsumerGroups(Collections.singletonList("my-group"), ar -> {
ConsumerGroupDescription consumerGroupDescription = ar.result().get("my-group");
System.out.println("Group id=" + consumerGroupDescription.getGroupId() +
" state= " + consumerGroupDescription.getState() +
" coordinator host= " + consumerGroupDescription.getCoordinator().getHost());
for (MemberDescription memberDescription : consumerGroupDescription.getMembers()) {
System.out.println("client id= " + memberDescription.getClientId() +
" topic partitions= " + memberDescription.getAssignment().getTopicPartitions());
}
});
}
}