Overview
TDMQ for CKafka is a distributed stream processing platform used to build real-time data pipelines and streaming applications. It provides high throughput, low latency, scalability, and fault tolerance.
This document describes the key parameters, practical tutorials and FAQs of the librdkafka client mentioned above.
Producer Practice
Version Selection
When librdkafka is used, it automatically selects the appropriate protocol version for communication based on the version of the Kafka cluster. Due to the rapid iteration updates of Kafka versions, usually, we recommend the latest version of librdkafka for you to achieve the best compatibility and performance.
Producer Parameters and Optimization
Producer Parameters
The key parameters of librdkafka and their default values are as follows:
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
rd_kafka_conf_set(conf, "message.send.max.retries", "2", NULL, 0);
rd_kafka_conf_set(conf, "retry.backoff.ms", "100", NULL, 0);
rd_kafka_conf_set(conf, "request.timeout.ms", "5000", NULL, 0);
rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "131072", NULL, 0);
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "100000", NULL, 0);
rd_kafka_conf_set(conf, "queue.buffering.max.total.bytes", "1000000", NULL, 0);
rd_kafka_conf_set(conf, "queue.buffering.max.ms", "0", NULL, 0);
rd_kafka_conf_set(conf, "compression.codec", "none", NULL, 0);
rd_kafka_conf_set(conf, "compression.level", "0", NULL, 0);
rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);
rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "1000000", NULL, 0);
rd_kafka_conf_set(conf, "broker.address.ttl", "3", NULL, 0);
rd_kafka_conf_set(conf, "reconnect.backoff.ms", "1000", NULL, 0);
rd_kafka_conf_set(conf, "reconnect.backoff.max.ms", "10000", NULL, 0);
rd_kafka_conf_set(conf, "api.version.request.timeout.ms", "10000", NULL, 0);
rd_kafka_conf_set(conf, "security.protocol", "plaintext", NULL, 0);
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
Parameter Description and Optimization
acks Parameter Optimization
The acks parameter controls the confirmation mechanism when the producer sends a message. The default value of this parameter is -1, which means that it returns only after the message is sent to the leader broker, the leader confirms, and the corresponding follower messages are all written. The acks parameter also has the following optional values: 0, 1, and -1. In cross-availability zone scenarios, and for topics with a high number of replicas, the value of the acks parameter can affect the message's reliability and throughput. Therefore:
In some online business messaging scenarios where the requirement for throughput is not significant, you can set the acks parameter to -1 to ensure that the message is received and acknowledged by all replicas before returning, thus improving the message's reliability.
In scenarios involving big data, such as log collection, and offline computing, where high throughput (i.e., the volume of data written to Kafka per second) is required, you can set the acks to 1 to increase throughput.
Buffering Parameter Optimization (Caching)
By default, for transmitting the same volume of data, a single request's network transmission can effectively reduce related computation and network resources compared to multiple requests, thereby improving the overall write throughput.
Therefore, you can set this parameter to optimize the client's message sending throughput. For librdkafka, a default batching time of 5 ms is provided to buffer messages. If the messages are small, you can increase the queue.buffering.max.ms duration appropriately.
Compression Parameter Optimization
librdkafka supports the following compression parameters: none, gzip, snappy, lz4, and zstd.
The librdkafka client supports the following compression algorithms:
none: No compression algorithm.
gzip: Compress by GZIP algorithm.
snappy: Compress by Snappy algorithm.
lz4: Compress by LZ4 algorithm.
zstd: Compress by ZSTD algorithm.
To use a compression algorithm in the producer client, set the compression.type parameter when creating the producer. For example, if you want to use the LZ4 compression algorithm, set compression.type to lz4. The CPU compression and CPU decompression occur on the client side, representing a calculation-for-bandwidth optimization. However, the broker's verification behavior for compressed messages, especially for Gzip compression, incurs additional computation costs, resulting in significant server-side computation costs. The increased computation could reduce the broker's message processing capability, leading to lower bandwidth throughput, which may not be worthwhile in some cases. In such cases, we recommend the following method:
Independently compress message data on the producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
{"Compression","CompressionLZ4"}
On the producer side, send messageCompression as a normal message.
On the consumer side, read the message key to access the compression method used, and perform independent decompression.
Creating Producer Instance
If an application requires higher throughput, it can use an asynchronous producer to increase the speed of message sending. At the same time, batch message sending can be utilized to reduce network overhead and IO consumption. If an application demands higher reliability, a synchronous producer can be used to ensure successful message delivery. Additionally, the ACK acknowledgement mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter optimization, see Producer Parameters and Optimization.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "Message delivery failed: %s\\n", rd_kafka_err2str(rkmessage->err));
} else {
fprintf(stderr, "Message delivered (%zd bytes, partition %"PRId32")\\n",
rkmessage->len, rkmessage->partition);
}
}
int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
rd_kafka_conf_set(conf, "acks", "1", NULL, 0);
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
char errstr[512];
rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
fprintf(stderr, "Failed to create producer: %s\\n", errstr);
return 1;
}
rd_kafka_topic_t *topic = rd_kafka_topic_new(producer, "test", NULL);
if (!topic) {
fprintf(stderr, "Failed to create topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(producer);
return 1;
}
const char *message = "Hello, Kafka!";
if (rd_kafka_produce(
topic,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
(void *)message,
strlen(message),
NULL,
0,
NULL) == -1) {
fprintf(stderr, "Failed to produce to topic %s: %s\\n", rd_kafka_topic_name(topic), rd_kafka_err2str(rd_kafka_last_error()));
}
while (rd_kafka_outq_len(producer) > 0) {
rd_kafka_poll(producer, 1000);
}
rd_kafka_topic_destroy(topic);
rd_kafka_destroy(producer);
return 0;
}
Consumer Practice
Consumer Parameters and Optimization
Consumer Parameters
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);
rd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0);
rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
Parameter Description and Optimization
For automatic offset commit requests, it's recommended not to set auto.commit.interval.ms below 1,000 ms, as too frequent offset requests can cause high broker CPU usage, affecting the read and write operations of other normal services.
Creating Consumer Instance
Provide a subscription model for creating consumers, which offers two ways of offset commit: manual submission and automatic submission.
Auto-Commit Offsets
Auto-commit offsets: After pulling messages, consumers automatically commit their offsets without manual intervention. The advantage of this method is it's easy to use, but it may lead to duplicate message consumption or loss.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
"offset %"PRId64": %s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
} else {
printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
}
}
int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);
rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
return 1;
}
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(topics);
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (rkmessage) {
msg_consume(rkmessage, NULL);
rd_kafka_message_destroy(rkmessage);
}
}
rd_kafka_unsubscribe(consumer);
rd_kafka_destroy(consumer);
return 0;
}
Manual-Commit Offsets
Manual-commit offsets: After processing messages, consumers need to manually commit their offsets. The advantage of this method is that it allows for precise control over offset commit, avoiding duplicate message consumption or loss. Not that manual commit can lead to high broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the broker. Therefore, it is recommended to commit offsets after a certain number of messages.
#include <stdio.h>
#include <string.h>
#include <librdkafka/rdkafka.h>
void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
"offset %"PRId64": %s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
} else {
printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
}
}
int main() {
rd_kafka_conf_t *conf = rd_kafka_conf_new();
rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);
char errstr[512];
rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
return 1;
}
rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_topic_partition_list_destroy(topics);
rd_kafka_destroy(consumer);
return 1;
}
rd_kafka_topic_partition_list_destroy(topics);
int message_count = 0;
while (1) {
rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
if (rkmessage) {
msg_consume(rkmessage, NULL);
if (++message_count % 10 == 0) {
if (rd_kafka_commit_message(consumer, rkmessage, 0) != RD_KAFKA_RESP_ERR_NO_ERROR) {
fprintf(stderr, "Failed to commit offset for message: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
} else {
printf("Offset %"PRId64" committed\\n", rkmessage->offset);
}
}
rd_kafka_message_destroy(rkmessage);
}
}
rd_kafka_unsubscribe(consumer);
rd_kafka_destroy(consumer);
return 0;
}
Was this page helpful?