package io.vertx.kafka.client.common.tracing;
import io.vertx.core.Context;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.kafka.client.common.KafkaClientOptions;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Utils;
public class ProducerTracer<S> {
private final VertxTracer<Void, S> tracer;
private final String address;
private final String hostname;
private final String port;
private final TracingPolicy policy;
public static <S> ProducerTracer create(VertxTracer tracer, KafkaClientOptions opts) {
TracingPolicy policy = opts.getTracingPolicy() != null ? opts.getTracingPolicy() : TracingPolicy.PROPAGATE;
if (policy == TracingPolicy.IGNORE || tracer == null) {
return null;
}
String address = opts.getTracePeerAddress();
if (address == null) {
if (opts.getConfig() != null) {
address = (String) opts.getConfig().getOrDefault(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
} else {
address = "";
}
}
return new ProducerTracer<S>(tracer, policy, address);
}
private ProducerTracer(VertxTracer<Void, S> tracer, TracingPolicy policy, String bootstrapServer) {
this.tracer = tracer;
this.address = bootstrapServer;
this.hostname = Utils.getHost(bootstrapServer);
Integer port = Utils.getPort(bootstrapServer);
this.port = port == null ? null : port.toString();
this.policy = policy;
}
public StartedSpan prepareSendMessage(Context context, ProducerRecord record) {
TraceContext tc = new TraceContext("producer", address, hostname, port, record.topic());
S span = tracer.sendRequest(context, SpanKind.MESSAGING, policy, tc, "kafka_send", (k, v) -> record.headers().add(k, v.getBytes()), TraceTags.TAG_EXTRACTOR);
return new StartedSpan(span);
}
public class StartedSpan {
private final S span;
private StartedSpan(S span) {
this.span = span;
}
public void finish(Context context) {
tracer.receiveResponse(context, null, span, null, TagExtractor.<TraceContext>empty());
}
public void fail(Context context, Throwable failure) {
tracer.receiveResponse(context, null, span, failure, TagExtractor.empty());
}
}
}