Overview
TDMQ for CKafka is a distributed stream processing platform used to build real-time data pipelines and streaming applications. It offers high throughput, low latency, scalability, and fault tolerance.
Kafka Clients: These are Kafka's built-in clients, implemented in Java. They serve as clients for Kafka's standard production and consumption protocols. This document describes the key parameters, practical tutorials, and FAQs about the aforementioned Java clients.
Producer Practice
Version Selection
The compatibility between Kafka clients and clusters is very important. Generally, newer versions of clients are compatible with older versions of clusters, but the reverse may not necessarily be true. Typically, the version of the CKafka instance's broker is clear after deployment, so you can just choose the matching client version based on the broker's version.
In the Java ecosystem, Spring Kafka is widely used. The correspondence between Spring Kafka versions and Kafka Broker versions can be found on the official Spring website under Version Correspondence. Producer parameters and optimization
Producer Parameters
When writing to Kafka using the Kafka Client, you need to configure the following key parameters. The parameters and their default values are:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
props.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 540000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "");
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 300000);
props.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, "");
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.apache.kafka.clients.producer.RoundRobinPartitioner");
props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, 32768);
props.put(ProducerConfig.SEND_BUFFER_CONFIG, 131072);
props.put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 1000);
props.put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, 50);
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.RETRIES_CONFIG, 2147483647);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
props.put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, "default");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
String key = "key-" + i;
String value = "value-" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Message sent successfully:key=" + key + ", value=" + value);
} else {
System.err.println("Message sending failed:" + exception.getMessage());
}
}
});
}
producer.close();
}
}
Parameter Description and Optimization
acks Parameter Optimization
The acks parameter is used to control the confirmation mechanism when the producer sends messages. Its default value is 1, which means that after the message is sent to the leader broker, it returns upon the leader's confirmation of the message being written. The acks parameter also has the following optional values:
0: Do not wait for any confirmation, return directly.
1: Wait for the leader replica to confirm the write before returning.
-1 or all: Wait for the Leader replica and the relevant follower replicas to confirm the write before returning.
In cross availability zone scenarios, and for topics with a higher number of replicas, the choice of acks parameter affects the message's reliability and throughput. Therefore:
In some online business message scenarios, where throughput requirements are not high, you can set the acks parameter to -1 to ensure that the message is received and confirmed by all replicas before returning. This improves message reliability but sacrifices write throughput and performance, and the latency will increase.
In scenarios involving big data, such as log collection, or offline computing, where high throughput (i.e., the volume of data written to Kafka per second) is required, you can set the acks to 1 to improve throughput.
Batch Parameter Optimization
By default, for transmitting the same volume of data, a single request's network transmission can effectively reduce related computation and network resources compared to multiple requests, thereby improving the overall write throughput.
Therefore, this parameter can be set to optimize the client's message sending throughput capabilities. In high throughput scenarios, you can set this parameter in combination with computation:
batch.size: Default is 16 K.
linger.ms: Default is 0. You can appropriately increase the delay, such as setting it to 100 ms, to aggregate more messages for batch sending.
buffer.memory: Default is 32 MB. For high-traffic producers, you can set it larger if there is sufficient heap memory, such as setting it to 256 MB.
Transaction Parameter Optimization
put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);
Note that the transaction will incur additional computing resources, because it guarantee exactly once semantics.
For transaction scenarios, it is appropriate to increase the transaction timeout to tolerate jitter brought on by write latency in high throughput scenarios.
Compression Parameter Optimization
The Kafka Java Client supports the following Compression Parameters:
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
Currently, it supports the following Compression configurations:
none: No compression algorithm.
gzip: Compress by GZIP algorithm.
snappy: Compress by Snappy algorithm.
lz4: Compress by LZ4 algorithm.
zstd: Compress by ZSTD algorithm.
To use compressed messages in Kafka Java Client, you need to set the compression.type parameter when creating a producer. For example, to use the LZ4 compression algorithm, you can set compression.type to lz4.
Kafka message compression is an optimization method that uses compute to save bandwidth. Although Kafka message compression and decompression occur on the client side, the broker performs verification actions on compressed messages, leading to extra computation cost. As the increased compute leads to high broker CPU usage, reducing the processing capability for other requests, the overall performance drop, especially for gzip compression. The server-side verification computation cost of such compressed messages can be very high. For some cases, the cost is not worth the benefit. We recommend the following method to avoid broker verification in such cases:
1. Independently compress message data on the producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
2. On the producer side, send messageCompression as a normal message.
3. On the consumer side, read the message key, access the compression method used and performs decompression independently.
Creating Producer Instance
If your application needs higher throughput, you can use asynchronous sending to increase the speed of message sending. At the same time, batch message sending can be utilized to reduce network overhead and IO consumption. If the application requires higher reliability, synchronous sending can ensure message delivery success. Meanwhile, ACK confirmation mechanism and transaction mechanism can be used to ensure the reliability and consistency of messages. For specific parameter optimization, see producer parameters and optimization.
Synchronous Sending
An example of synchronous sending in the Kafka Java Client:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerSyncExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "sync-key-" + i;
String value = "sync-value-" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("Synchronous sending succeeded:key=" + key + ", value=" + value);
} catch (InterruptedException | ExecutionException e) {
System.err.println("Synchronous sending failed:" + e.getMessage());
}
}
producer.close();
}
}
Asynchronous Sending
Asynchronous sending: When messages are sent asynchronously, the current thread won't be blocked, and the producer throughput is higher. However, message results need to be handled through a callback function. Example:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerAsyncExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {
String key = "async-key-" + i;
String value = "async-value-" + i;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("Asynchronous sending succeeded:key=" + key + ", value=" + value);
} else {
System.err.println("Asynchronous sending failed:" + exception.getMessage());
}
}
});
}
producer.close();
}
}
Consumer Practice
Consumer Parameters
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "52428800");
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-client-id");
properties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
Parameter Optimization
1. When using Kafka consumers, we can optimize performance by adjusting some parameters. Here are some common parameter optimization methods:
fetch.min.bytes: If you don't know the minimum message size, we recommend you to set this parameter to 1. You can set this value to the minimum message size if you know it.
max.poll.records: This parameter can be adjusted based on the processing capacity of the application. If your application can handle more records, you can set this value to a larger number to reduce the frequency of poll operations.
auto.commit.interval.ms: This parameter can be adjusted according to the needs of your application. Generally, for scenarios with automatic offset commits, it is recommended to set it to the default value of 5,000 ms. Note that excessively frequent offset commits can affect performance and additionally consume broker's computational resources.
client.id: You can set a unique ID for each consumer to distinguish between different consumers in monitoring and logs.
The above are some common parameter optimization methods, but the optimal settings might vary based on the features and requirements of your application. When optimize parameters, remember to always conduct performance testing to ensure the result matches your expectation.
2. For issues of frequent rebalance and consumption thread blocking, see the following parameter optimization instructions:
session.timeout.ms: For versions before v0.10.2, increase this parameter value appropriately to make it greater than the time it takes to consume a batch of data and not exceed 30 s. The recommended value is 25 s. For v0.10.2 and later versions, use the default value of 10 s.
max.poll.records: Decrease this value to make it significantly less than the product of <the number of messages consumed per second per thread> x <the number of consumption threads> x <max.poll.interval.ms> as recommended.
max.poll.interval.ms: This value should be greater than <max.poll.records> / (<the number of messages consumed per second per thread> x <the number of consumption threads>).
Creating Consumer Instance
The Kafka Java Client provides a subscription model to create consumers, where it offers two ways to commit the offset: manually and automatically.
Auto-Commit Offsets
Auto-commit offsets: Consumers automatically commit their offsets after pulling messages, eliminating the need for manual operation. This method's advantage is its simplicity and ease of use, but it may lead to duplicate message consumption or loss. Note that the auto-commit interval, auto.commit.interval.ms, should not be set too short; otherwise, it may lead to relatively high broker CPU utilization, affecting the processing of other requests.
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerAutoCommitExample {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC = "test-topic";
private static final String GROUP_ID = "test-group";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consume message: topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
Manual-Commit Offsets
Manual-commit offsets: After processing messages, consumers need to manually commit their offsets. The advantage of this method is that it allows for precise control over offset commit, avoiding duplicate message consumption or loss. However, it should be noted that manual commit can lead to high broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the broker. Therefore, it is recommended to commit offset after a certain number of messages.
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerManualCommitExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("test-topic"));
int count = 0;
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
count++;
if (count % 10 == 0) {
consumer.commitSync();
System.out.println("Committed offsets.");
}
}
}
} finally {
consumer.close();
}
}
}
Assign Consumption
The Kafka Java Client's assign consumption mode allows consumers to directly specify the partitions for subscription, rather than automatically assigning partitions through topic subscription. This mode is suitable for scenarios where manual control of consumed partitions is needed, such as implementing specific cloud load balancer policy, or skipping certain partitions in some cases. The general process involves using the assign method to manually specify the partitions consumed by the consumer, setting the starting offset for consumption with the seek method, and then executing the consumption logic. For example:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerAssignAndSeekApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = "my-topic";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
long startPosition0 = 10L;
long startPosition1 = 20L;
consumer.seek(partition0, startPosition0);
consumer.seek(partition1, startPosition1);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
Production and Consumption FAQs of Kafka Java Client
Kafka Java Producer is unable to send messages successfully
First, check if the IP and port of the Kafka cluster can be connected normally. If not, resolve the connection issues first.
Next, verify the correct configuration of the access point and whether the version matches the broker version. Send demo according to the best practices.
Was this page helpful?