/*
 * Copyright 2016 Red Hat Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.vertx.kafka.client.producer.impl;

import io.vertx.core.buffer.Buffer;
import io.vertx.kafka.client.producer.KafkaHeader;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Vert.x Kafka producer record implementation
/** * Vert.x Kafka producer record implementation */
public class KafkaProducerRecordImpl<K, V> implements KafkaProducerRecord<K, V> { private final String topic; private final K key; private final V value; private final Long timestamp; private final Integer partition; private final List<KafkaHeader> headers = new ArrayList<>();
Constructor
Params:
  • topic – the topic this record is being sent to
  • key – the key (or null if no key is specified)
  • value – the value
  • timestamp – the timestamp of this record
  • partition – the partition to which the record will be sent (or null if no partition was specified)
/** * Constructor * * @param topic the topic this record is being sent to * @param key the key (or null if no key is specified) * @param value the value * @param timestamp the timestamp of this record * @param partition the partition to which the record will be sent (or null if no partition was specified) */
public KafkaProducerRecordImpl(String topic, K key, V value, Long timestamp, Integer partition) { this.topic = topic; this.key = key; this.value = value; this.timestamp = timestamp; this.partition = partition; }
Constructor
Params:
  • topic – the topic this record is being sent to
  • key – the key (or null if no key is specified)
  • value – the value
  • partition – the partition to which the record will be sent (or null if no partition was specified)
/** * Constructor * * @param topic the topic this record is being sent to * @param key the key (or null if no key is specified) * @param value the value * @param partition the partition to which the record will be sent (or null if no partition was specified) */
public KafkaProducerRecordImpl(String topic, K key, V value, Integer partition) { this.topic = topic; this.key = key; this.value = value; this.timestamp = null; this.partition = partition; }
Constructor
Params:
  • topic – the topic this record is being sent to
  • key – the key (or null if no key is specified)
  • value – the value
/** * Constructor * * @param topic the topic this record is being sent to * @param key the key (or null if no key is specified) * @param value the value */
public KafkaProducerRecordImpl(String topic, K key, V value) { this.topic = topic; this.key = key; this.value = value; this.timestamp = null; this.partition = null; }
Constructor
Params:
  • topic – the topic this record is being sent to
  • value – the value
/** * Constructor * * @param topic the topic this record is being sent to * @param value the value */
public KafkaProducerRecordImpl(String topic, V value) { this.topic = topic; this.key = null; this.value = value; this.timestamp = null; this.partition = null; } @Override public String topic() { return topic; } @Override public K key() { return key; } @Override public Long timestamp() { return timestamp; } @Override public V value() { return value; } @Override public Integer partition() { return partition; } @Override public KafkaProducerRecord<K, V> addHeader(String key, Buffer value) { return addHeader(new KafkaHeaderImpl(key, value)); } @Override public KafkaProducerRecord<K, V> addHeader(String key, String value) { return addHeader(new KafkaHeaderImpl(key, value)); } @Override public KafkaProducerRecord<K, V> addHeader(KafkaHeader header) { headers.add(header); return this; } @Override public KafkaProducerRecord<K, V> addHeaders(List<KafkaHeader> headers) { this.headers.addAll(headers); return this; } @Override public ProducerRecord<K, V> record() { if (headers.isEmpty()) { return new ProducerRecord<>(topic, partition, timestamp, key, value); } else { return new ProducerRecord<>( topic, partition, timestamp, key, value, headers.stream() .map(header -> new RecordHeader(header.key(), header.value().getBytes())) .collect(Collectors.toList())); } } @Override public List<KafkaHeader> headers() { return headers; } @Override public String toString() { return "KafkaProducerRecord{" + "topic=" + this.topic + ",partition=" + this.partition + ",timestamp=" + this.timestamp + ",key=" + this.key + ",value=" + this.value + ",headers=" + this.headers + "}"; } }