/*
* Copyright 2020 Red Hat Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.vertx.kafka.client.common;
import io.vertx.codegen.annotations.DataObject;
import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.core.json.JsonObject;
import io.vertx.core.tracing.TracingPolicy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Generic KafkaClient options.
/**
* Generic KafkaClient options.
*/
@DataObject(generateConverter = true)
public class KafkaClientOptions {
Default peer address to set in traces tags is null, and will automatically pick up bootstrap server from config
/**
* Default peer address to set in traces tags is null, and will automatically pick up bootstrap server from config
*/
public static final String DEFAULT_TRACE_PEER_ADDRESS = null;
Default tracing policy is 'propagate'
/**
* Default tracing policy is 'propagate'
*/
public static final TracingPolicy DEFAULT_TRACING_POLICY = TracingPolicy.PROPAGATE;
private Map<String, Object> config;
private String tracePeerAddress = DEFAULT_TRACE_PEER_ADDRESS;
private TracingPolicy tracingPolicy = DEFAULT_TRACING_POLICY;
public KafkaClientOptions() {
}
public KafkaClientOptions(JsonObject json) {
this();
KafkaClientOptionsConverter.fromJson(json, this);
}
Create KafkaClientOptions from underlying Kafka config as map
Params: - config – config map to be passed down to underlying Kafka client
Returns: an instance of KafkaClientOptions
/**
* Create KafkaClientOptions from underlying Kafka config as map
* @param config config map to be passed down to underlying Kafka client
* @return an instance of KafkaClientOptions
*/
public static KafkaClientOptions fromMap(Map<String, Object> config, boolean isProducer) {
String tracePeerAddress = (String) config.getOrDefault(isProducer ? ProducerConfig.BOOTSTRAP_SERVERS_CONFIG : ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
return new KafkaClientOptions().setTracePeerAddress(tracePeerAddress);
}
Create KafkaClientOptions from underlying Kafka config as Properties
Params: - config – config properties to be passed down to underlying Kafka client
Returns: an instance of KafkaClientOptions
/**
* Create KafkaClientOptions from underlying Kafka config as Properties
* @param config config properties to be passed down to underlying Kafka client
* @return an instance of KafkaClientOptions
*/
public static KafkaClientOptions fromProperties(Properties config, boolean isProducer) {
String tracePeerAddress = (String) config.getOrDefault(isProducer ? ProducerConfig.BOOTSTRAP_SERVERS_CONFIG : ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
return new KafkaClientOptions().setTracePeerAddress(tracePeerAddress);
}
Returns: the kafka config
/**
* @return the kafka config
*/
public Map<String, Object> getConfig() {
return config;
}
Set the Kafka config.
Params: - config – the config
Returns: a reference to this, so the API can be used fluently
/**
* Set the Kafka config.
*
* @param config the config
* @return a reference to this, so the API can be used fluently
*/
public KafkaClientOptions setConfig(Map<String, Object> config) {
this.config = config;
return this;
}
Set a Kafka config entry.
Params: - key – the config key
- value – the config value
Returns: a reference to this, so the API can be used fluently
/**
* Set a Kafka config entry.
*
* @param key the config key
* @param value the config value
* @return a reference to this, so the API can be used fluently
*/
@GenIgnore
public KafkaClientOptions setConfig(String key, Object value) {
if (config == null) {
config = new HashMap<>();
}
config.put(key, value);
return this;
}
Returns: the kafka tracing policy
/**
* @return the kafka tracing policy
*/
public TracingPolicy getTracingPolicy() {
return tracingPolicy;
}
Set the Kafka tracing policy.
Params: - tracingPolicy – the tracing policy
Returns: a reference to this, so the API can be used fluently
/**
* Set the Kafka tracing policy.
*
* @param tracingPolicy the tracing policy
* @return a reference to this, so the API can be used fluently
*/
public KafkaClientOptions setTracingPolicy(TracingPolicy tracingPolicy) {
this.tracingPolicy = tracingPolicy;
return this;
}
Returns: the Kafka "peer address" to show in trace tags
/**
* @return the Kafka "peer address" to show in trace tags
*/
public String getTracePeerAddress() {
return tracePeerAddress;
}
Set the Kafka address to show in trace tags.
Or leave it unset to automatically pick up bootstrap server from config instead.
Params: - tracePeerAddress – the Kafka "peer address" to show in trace tags
Returns: a reference to this, so the API can be used fluently
/**
* Set the Kafka address to show in trace tags.
* Or leave it unset to automatically pick up bootstrap server from config instead.
*
* @param tracePeerAddress the Kafka "peer address" to show in trace tags
* @return a reference to this, so the API can be used fluently
*/
public KafkaClientOptions setTracePeerAddress(String tracePeerAddress) {
this.tracePeerAddress = tracePeerAddress;
return this;
}
public JsonObject toJson() {
return new JsonObject();
}
}