package io.vertx.kafka.client.consumer.impl;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaHeader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.TimestampType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class KafkaConsumerRecordImpl<K, V> implements KafkaConsumerRecord<K, V> {
private final ConsumerRecord<K, V> record;
private List<KafkaHeader> ;
public KafkaConsumerRecordImpl(ConsumerRecord<K, V> record) {
this.record = record;
}
@Override
public String topic() {
return this.record.topic();
}
@Override
public int partition() {
return this.record.partition();
}
@Override
public long offset() {
return this.record.offset();
}
@Override
public long timestamp() {
return this.record.timestamp();
}
@Override
public TimestampType timestampType() {
return this.record.timestampType();
}
@Deprecated
@Override
public long checksum() {
return this.record.checksum();
}
@Override
public K key() {
return this.record.key();
}
@Override
public V value() {
return this.record.value();
}
@Override
public ConsumerRecord<K, V> record() {
return this.record;
}
@Override
public List<KafkaHeader> () {
if (headers == null) {
if (record.headers() == null) {
headers = Collections.emptyList();
} else {
headers = new ArrayList<>();
for (Header header : record.headers()) {
headers.add(KafkaHeader.header(header.key(), header.value()));
}
}
}
return headers;
}
@Override
public String toString() {
return "KafkaConsumerRecord{" +
"topic=" + this.record.topic() +
",partition=" + this.record.partition() +
",offset=" + this.record.offset() +
",timestamp=" + this.record.timestamp() +
",key=" + this.record.key() +
",value=" + this.record.value() +
",headers=" + this.record.headers() +
"}";
}
}