package io.vertx.kafka.admin.impl;
import io.vertx.kafka.admin.ListConsumerGroupOffsetsOptions;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.OffsetAndMetadata;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import io.vertx.core.Promise;
import io.vertx.core.impl.ContextInternal;
import io.vertx.kafka.admin.ClusterDescription;
import io.vertx.kafka.admin.Config;
import io.vertx.kafka.admin.ConsumerGroupDescription;
import io.vertx.kafka.admin.ConsumerGroupListing;
import io.vertx.kafka.admin.ListOffsetsResultInfo;
import io.vertx.kafka.admin.MemberDescription;
import io.vertx.kafka.admin.NewTopic;
import io.vertx.kafka.admin.OffsetSpec;
import io.vertx.kafka.admin.TopicDescription;
import io.vertx.kafka.client.common.ConfigResource;
import io.vertx.kafka.client.common.Node;
import io.vertx.kafka.client.common.TopicPartitionInfo;
import io.vertx.kafka.client.common.impl.Helper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigsResult;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.kafka.admin.KafkaAdminClient;
import org.apache.kafka.common.KafkaFuture;
public class KafkaAdminClientImpl implements KafkaAdminClient {
private Vertx vertx;
private AdminClient adminClient;
public KafkaAdminClientImpl(Vertx vertx, AdminClient adminClient) {
this.vertx = vertx;
this.adminClient = adminClient;
}
@Override
public void describeTopics(List<String> topicNames, Handler<AsyncResult<Map<String, TopicDescription>>> completionHandler) {
describeTopics(topicNames).onComplete(completionHandler);
}
@Override
public Future<Map<String, TopicDescription>> describeTopics(List<String> topicNames) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Map<String, TopicDescription>> promise = ctx.promise();
DescribeTopicsResult describeTopicsResult = this.adminClient.describeTopics(topicNames);
describeTopicsResult.all().whenComplete((t, ex) -> {
if (ex == null) {
Map<String, TopicDescription> topics = new HashMap<>();
for (Map.Entry<String, org.apache.kafka.clients.admin.TopicDescription> topicDescriptionEntry : t.entrySet()) {
List<TopicPartitionInfo> partitions = new ArrayList<>();
for (org.apache.kafka.common.TopicPartitionInfo kafkaPartitionInfo : topicDescriptionEntry.getValue().partitions()) {
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo();
topicPartitionInfo.setIsr(
kafkaPartitionInfo.isr().stream().map(Helper::from).collect(Collectors.toList()))
.setLeader(Helper.from(kafkaPartitionInfo.leader()))
.setPartition(kafkaPartitionInfo.partition())
.setReplicas(
kafkaPartitionInfo.replicas().stream().map(Helper::from).collect(Collectors.toList()));
partitions.add(topicPartitionInfo);
}
TopicDescription topicDescription = new TopicDescription();
topicDescription.setInternal(topicDescriptionEntry.getValue().isInternal())
.setName(topicDescriptionEntry.getKey())
.setPartitions(partitions);
topics.put(topicDescriptionEntry.getKey(), topicDescription);
}
promise.complete(topics);
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void listTopics(Handler<AsyncResult<Set<String>>> completionHandler) {
listTopics().onComplete(completionHandler);
}
@Override
public Future<Set<String>> listTopics() {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Set<String>> promise = ctx.promise();
ListTopicsResult listTopicsResult = this.adminClient.listTopics();
listTopicsResult.names().whenComplete((topics, ex) -> {
if (ex == null) {
promise.complete(topics);
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void createTopics(List<NewTopic> topics, Handler<AsyncResult<Void>> completionHandler) {
createTopics(topics).onComplete(completionHandler);
}
@Override
public Future<Void> createTopics(List<NewTopic> topics) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Void> promise = ctx.promise();
CreateTopicsResult createTopicsResult = this.adminClient.createTopics(Helper.toNewTopicList(topics));
createTopicsResult.all().whenComplete((v, ex) -> {
if (ex == null) {
promise.complete();
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void deleteTopics(List<String> topicNames, Handler<AsyncResult<Void>> completionHandler) {
deleteTopics(topicNames).onComplete(completionHandler);
}
@Override
public Future<Void> deleteTopics(List<String> topicNames) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Void> promise = ctx.promise();
DeleteTopicsResult deleteTopicsResult = this.adminClient.deleteTopics(topicNames);
deleteTopicsResult.all().whenComplete((v, ex) -> {
if (ex == null) {
promise.complete();
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void describeConfigs(List<ConfigResource> configResources, Handler<AsyncResult<Map<ConfigResource, Config>>> completionHandler) {
describeConfigs(configResources).onComplete(completionHandler);
}
@Override
public Future<Map<ConfigResource, Config>> describeConfigs(List<ConfigResource> configResources) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Map<ConfigResource, Config>> promise = ctx.promise();
DescribeConfigsResult describeConfigsResult = this.adminClient.describeConfigs(Helper.toConfigResourceList(configResources));
describeConfigsResult.all().whenComplete((m, ex) -> {
if (ex == null) {
Map<ConfigResource, Config> configs = new HashMap<>();
for (Map.Entry<org.apache.kafka.common.config.ConfigResource, org.apache.kafka.clients.admin.Config> configEntry : m.entrySet()) {
ConfigResource configResource = Helper.from(configEntry.getKey());
Config config = Helper.from(configEntry.getValue());
configs.put(configResource, config);
}
promise.complete(configs);
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void alterConfigs(Map<ConfigResource,Config> configs, Handler<AsyncResult<Void>> completionHandler) {
alterConfigs(configs).onComplete(completionHandler);
}
@Override
public Future<Void> alterConfigs(Map<ConfigResource, Config> configs) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Void> promise = ctx.promise();
AlterConfigsResult alterConfigsResult = this.adminClient.alterConfigs(Helper.toConfigMaps(configs));
alterConfigsResult.all().whenComplete((v, ex) -> {
if (ex == null) {
promise.complete();
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void listConsumerGroups(Handler<AsyncResult<List<ConsumerGroupListing>>> completionHandler) {
listConsumerGroups().onComplete(completionHandler);
}
@Override
public Future<List<ConsumerGroupListing>> listConsumerGroups() {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<List<ConsumerGroupListing>> promise = ctx.promise();
ListConsumerGroupsResult listConsumerGroupsResult = this.adminClient.listConsumerGroups();
listConsumerGroupsResult.all().whenComplete((groupIds, ex) -> {
if (ex == null) {
promise.complete(Helper.fromConsumerGroupListings(groupIds));
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void describeConsumerGroups(List<java.lang.String> groupIds, Handler<AsyncResult<Map<String, ConsumerGroupDescription>>> completionHandler) {
describeConsumerGroups(groupIds).onComplete(completionHandler);
}
@Override
public Future<Map<String, ConsumerGroupDescription>> describeConsumerGroups(List<String> groupIds) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Map<String, ConsumerGroupDescription>> promise = ctx.promise();
DescribeConsumerGroupsResult describeConsumerGroupsResult = this.adminClient.describeConsumerGroups(groupIds);
describeConsumerGroupsResult.all().whenComplete((cg, ex) -> {
if (ex == null) {
Map<String, ConsumerGroupDescription> consumerGroups = new HashMap<>();
for (Map.Entry<String, org.apache.kafka.clients.admin.ConsumerGroupDescription> cgDescriptionEntry: cg.entrySet()) {
List<MemberDescription> members = new ArrayList<>();
for (org.apache.kafka.clients.admin.MemberDescription memberDescription : cgDescriptionEntry.getValue().members()) {
MemberDescription m = new MemberDescription();
m.setConsumerId(memberDescription.consumerId())
.setClientId(memberDescription.clientId())
.setAssignment(Helper.from(memberDescription.assignment()))
.setHost(memberDescription.host());
members.add(m);
}
ConsumerGroupDescription consumerGroupDescription = new ConsumerGroupDescription();
consumerGroupDescription.setGroupId(cgDescriptionEntry.getValue().groupId())
.setCoordinator(Helper.from(cgDescriptionEntry.getValue().coordinator()))
.setMembers(members)
.setPartitionAssignor(cgDescriptionEntry.getValue().partitionAssignor())
.setSimpleConsumerGroup(cgDescriptionEntry.getValue().isSimpleConsumerGroup())
.setState(cgDescriptionEntry.getValue().state());
consumerGroups.put(cgDescriptionEntry.getKey(), consumerGroupDescription);
}
promise.complete(consumerGroups);
} else {
promise.fail(ex);
}
});
return promise.future();
}
public void listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options, Handler<AsyncResult<Map<TopicPartition, OffsetAndMetadata>>> completionHandler) {
listConsumerGroupOffsets(groupId, options).onComplete(completionHandler);
}
public Future<Map<TopicPartition, OffsetAndMetadata>> listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Map<TopicPartition, OffsetAndMetadata>> promise = ctx.promise();
ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult = this.adminClient.listConsumerGroupOffsets(groupId, Helper.to(options));
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().whenComplete((cgo, ex) -> {
if (ex == null) {
Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = new HashMap<>();
for (Map.Entry<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.consumer.OffsetAndMetadata> cgoOffset : cgo.entrySet()) {
consumerGroupOffsets.put(Helper.from(cgoOffset.getKey()), Helper.from(cgoOffset.getValue()));
}
promise.complete(consumerGroupOffsets);
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void deleteConsumerGroups(List<String> groupIds, Handler<AsyncResult<Void>> completionHandler) {
deleteConsumerGroups(groupIds).onComplete(completionHandler);
}
@Override
public Future<Void> deleteConsumerGroups(List<String> groupIds) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Void> promise = ctx.promise();
DeleteConsumerGroupsResult deleteConsumerGroupsResult = this.adminClient.deleteConsumerGroups(groupIds);
deleteConsumerGroupsResult.all().whenComplete((v, ex) -> {
if (ex == null) {
promise.complete();
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions, Handler<AsyncResult<Void>> completionHandler) {
deleteConsumerGroupOffsets(groupId, partitions).onComplete(completionHandler);
}
@Override
public Future<Void> deleteConsumerGroupOffsets(String groupId, Set<TopicPartition> partitions) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Void> promise = ctx.promise();
DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = this.adminClient.deleteConsumerGroupOffsets(groupId, Helper.toTopicPartitionSet(partitions));
deleteConsumerGroupOffsetsResult.all().whenComplete((v, ex) -> {
if (ex == null) {
promise.complete();
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public void describeCluster(Handler<AsyncResult<ClusterDescription>> completionHandler) {
describeCluster().onComplete(completionHandler);
}
@Override
public Future<ClusterDescription> describeCluster() {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<ClusterDescription> promise = ctx.promise();
DescribeClusterResult describeClusterResult = this.adminClient.describeCluster();
KafkaFuture.allOf(describeClusterResult.clusterId(), describeClusterResult.controller(), describeClusterResult.nodes()).whenComplete((r, ex) -> {
if (ex == null) {
try {
String clusterId = describeClusterResult.clusterId().get();
org.apache.kafka.common.Node rcontroller = describeClusterResult.controller().get();
Collection<org.apache.kafka.common.Node> rnodes = describeClusterResult.nodes().get();
Node controller = Helper.from(rcontroller);
List<Node> nodes = new ArrayList<>();
rnodes.forEach(rnode -> {
nodes.add(Helper.from(rnode));
});
ClusterDescription clusterDescription = new ClusterDescription(clusterId, controller, nodes);
promise.complete(clusterDescription);
} catch (InterruptedException|ExecutionException e) {
promise.fail(e);
}
} else {
promise.fail(ex);
}
});
return promise.future();
}
public void listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets, Handler<AsyncResult<Map<TopicPartition, ListOffsetsResultInfo>>> completionHandler) {
listOffsets(topicPartitionOffsets).onComplete(completionHandler);
}
@Override
public Future<Map<TopicPartition, ListOffsetsResultInfo>> listOffsets(Map<TopicPartition, OffsetSpec> topicPartitionOffsets) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Map<TopicPartition, ListOffsetsResultInfo>> promise = ctx.promise();
ListOffsetsResult listOffsetsResult = this.adminClient.listOffsets(Helper.toTopicPartitionOffsets(topicPartitionOffsets));
listOffsetsResult.all().whenComplete((o, ex) -> {
if (ex == null) {
Map<TopicPartition, ListOffsetsResultInfo> listOffsets = new HashMap<>();
for (Map.Entry<org.apache.kafka.common.TopicPartition, org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo> oOffset : o.entrySet()) {
listOffsets.put(Helper.from(oOffset.getKey()), Helper.from(oOffset.getValue()));
}
promise.complete(listOffsets);
} else {
promise.fail(ex);
}
});
return promise.future();
}
@Override
public Future<Void> close() {
return close(0);
}
@Override
public Future<Void> close(long timeout) {
ContextInternal ctx = (ContextInternal) vertx.getOrCreateContext();
Promise<Void> promise = ctx.promise();
ctx.executeBlocking(prom -> {
if (timeout > 0) {
adminClient.close(Duration.ofMillis(timeout));
} else {
adminClient.close();
}
prom.complete();
});
return promise.future();
}
@Override
public void close(Handler<AsyncResult<Void>> handler) {
close().onComplete(handler);
}
@Override
public void close(long timeout, Handler<AsyncResult<Void>> handler) {
close(timeout).onComplete(handler);
}
}