tencent cloud

Feedback

ibrdkafka SDK

Last updated: 2024-07-04 16:00:59

    Overview

    TDMQ for CKafka is a distributed stream processing platform used to build real-time data pipelines and streaming applications. It provides high throughput, low latency, scalability, and fault tolerance.
    This document describes the key parameters, practical tutorials and FAQs of the librdkafka client mentioned above.

    Producer Practice

    Version Selection

    When librdkafka is used, it automatically selects the appropriate protocol version for communication based on the version of the Kafka cluster. Due to the rapid iteration updates of Kafka versions, usually, we recommend the latest version of librdkafka for you to achieve the best compatibility and performance.

    Producer Parameters and Optimization

    Producer Parameters

    The key parameters of librdkafka and their default values are as follows:
    
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // Addresses of Kafka clusters, separated by commas, with the default value being empty.
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // The maximum number of attempts to send a message, including the first attempt and with the default value being 2.
    rd_kafka_conf_set(conf, "message.send.max.retries", "2", NULL, 0);
    
    // Backoff time between retries (in milliseconds), with the default value being 100.
    rd_kafka_conf_set(conf, "retry.backoff.ms", "100", NULL, 0);
    
    // Client request timeout (in milliseconds), with the default value being 5000.
    rd_kafka_conf_set(conf, "request.timeout.ms", "5000", NULL, 0);
    
    // The buffer size for data sent by the client (in bytes), with the default value being 131072.
    rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "131072", NULL, 0);
    
    // Maximum number of messages in the client's send buffer, with the default value being 100000.
    rd_kafka_conf_set(conf, "queue.buffering.max.messages", "100000", NULL, 0);
    
    // Maximum total size of messages in the client's send buffer (in bytes), with the default value being 1000000.
    rd_kafka_conf_set(conf, "queue.buffering.max.total.bytes", "1000000", NULL, 0);
    
    // Linger time of the client's send buffer (in milliseconds), with the default value being 0.
    rd_kafka_conf_set(conf, "queue.buffering.max.ms", "0", NULL, 0);
    
    // Whether to enable message compression, with the default value being 0 (disabled).
    rd_kafka_conf_set(conf, "compression.codec", "none", NULL, 0);
    
    // Message compression level, with the default value being 0 (auto-select).
    rd_kafka_conf_set(conf, "compression.level", "0", NULL, 0);
    
    // Client ID, with the default value being rdkafka.
    rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);
    
    // Maximum concurrency request count for the producer, i.e., the number of requests without broker response, with the default value being 1000000.
    rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "1000000", NULL, 0);
    
    // Maximum retry count for client connections to the Kafka cluster, with the default value being 3.
    rd_kafka_conf_set(conf, "broker.address.ttl", "3", NULL, 0);
    
    // Interval between retry attempts for client connections to the Kafka cluster (in milliseconds), with the default value being 1000.
    rd_kafka_conf_set(conf, "reconnect.backoff.ms", "1000", NULL, 0);
    
    // Maximum interval between retry attempts for client connections to the Kafka cluster (in milliseconds), with the default value being 10000.
    rd_kafka_conf_set(conf, "reconnect.backoff.max.ms", "10000", NULL, 0);
    
    // Backoff time for client API version (in milliseconds), with the default value being 10000.
    rd_kafka_conf_set(conf, "api.version.request.timeout.ms", "10000", NULL, 0);
    
    // Security protocol, with the default value being plaintext.
    rd_kafka_conf_set(conf, "security.protocol", "plaintext", NULL, 0);
    
    // For other SSL and SASL parameters, see the librdkafka documentation.
    
    // Create a producer instance.
    rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);

    Parameter Description and Optimization

    acks Parameter Optimization
    The acks parameter controls the confirmation mechanism when the producer sends a message. The default value of this parameter is -1, which means that it returns only after the message is sent to the leader broker, the leader confirms, and the corresponding follower messages are all written. The acks parameter also has the following optional values: 0, 1, and -1. In cross-availability zone scenarios, and for topics with a high number of replicas, the value of the acks parameter can affect the message's reliability and throughput. Therefore:
    In some online business messaging scenarios where the requirement for throughput is not significant, you can set the acks parameter to -1 to ensure that the message is received and acknowledged by all replicas before returning, thus improving the message's reliability.
    In scenarios involving big data, such as log collection, and offline computing, where high throughput (i.e., the volume of data written to Kafka per second) is required, you can set the acks to 1 to increase throughput.
    Buffering Parameter Optimization (Caching)
    By default, for transmitting the same volume of data, a single request's network transmission can effectively reduce related computation and network resources compared to multiple requests, thereby improving the overall write throughput.
    Therefore, you can set this parameter to optimize the client's message sending throughput. For librdkafka, a default batching time of 5 ms is provided to buffer messages. If the messages are small, you can increase the queue.buffering.max.ms duration appropriately.
    Compression Parameter Optimization
    librdkafka supports the following compression parameters: none, gzip, snappy, lz4, and zstd.
    The librdkafka client supports the following compression algorithms:
    none: No compression algorithm.
    gzip: Compress by GZIP algorithm.
    snappy: Compress by Snappy algorithm.
    lz4: Compress by LZ4 algorithm.
    zstd: Compress by ZSTD algorithm.
    To use a compression algorithm in the producer client, set the compression.type parameter when creating the producer. For example, if you want to use the LZ4 compression algorithm, set compression.type to lz4. The CPU compression and CPU decompression occur on the client side, representing a calculation-for-bandwidth optimization. However, the broker's verification behavior for compressed messages, especially for Gzip compression, incurs additional computation costs, resulting in significant server-side computation costs. The increased computation could reduce the broker's message processing capability, leading to lower bandwidth throughput, which may not be worthwhile in some cases. In such cases, we recommend the following method:
    Independently compress message data on the producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
    {"Compression","CompressionLZ4"}
    On the producer side, send messageCompression as a normal message.
    On the consumer side, read the message key to access the compression method used, and perform independent decompression.

    Creating Producer Instance

    If an application requires higher throughput, it can use an asynchronous producer to increase the speed of message sending. At the same time, batch message sending can be utilized to reduce network overhead and IO consumption. If an application demands higher reliability, a synchronous producer can be used to ensure successful message delivery. Additionally, the ACK acknowledgement mechanism and transaction mechanism can be employed to guarantee message reliability and consistency. For specific parameter optimization, see Producer Parameters and Optimization.
    #include <stdio.h>
    #include <string.h>
    #include <librdkafka/rdkafka.h>
    
    // Producer message sending callback.
    void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
    fprintf(stderr, "Message delivery failed: %s\\n", rd_kafka_err2str(rkmessage->err));
    } else {
    fprintf(stderr, "Message delivered (%zd bytes, partition %"PRId32")\\n",
    rkmessage->len, rkmessage->partition);
    }
    }
    
    int main() {
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // Set the addresses of the Kafka cluster.
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // Set ack to 1, indicating that the message is considered successfully sent once the leader replica receives it.
    rd_kafka_conf_set(conf, "acks", "1", NULL, 0);
    
    // Set the producer message sending callback.
    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
    
    // Create a producer instance.
    char errstr[512];
    rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!producer) {
    fprintf(stderr, "Failed to create producer: %s\\n", errstr);
    return 1;
    }
    
    // Create a topic instance.
    rd_kafka_topic_t *topic = rd_kafka_topic_new(producer, "test", NULL);
    if (!topic) {
    fprintf(stderr, "Failed to create topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
    rd_kafka_destroy(producer);
    return 1;
    }
    
    // Send the message.
    const char *message = "Hello, Kafka!";
    if (rd_kafka_produce(
    topic,
    RD_KAFKA_PARTITION_UA,
    RD_KAFKA_MSG_F_COPY,
    (void *)message,
    strlen(message),
    NULL,
    0,
    NULL) == -1) {
    fprintf(stderr, "Failed to produce to topic %s: %s\\n", rd_kafka_topic_name(topic), rd_kafka_err2str(rd_kafka_last_error()));
    }
    
    // Wait for all messages to be sent.
    while (rd_kafka_outq_len(producer) > 0) {
    rd_kafka_poll(producer, 1000);
    }
    
    // Destroy topic instance.
    rd_kafka_topic_destroy(topic);
    
    // Destroy producer instance.
    rd_kafka_destroy(producer);
    
    return 0;
    }

    Consumer Practice

    Consumer Parameters and Optimization

    Consumer Parameters

    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // Set the addresses of the Kafka cluster.
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // Set consumer group ID, with the default value being empty.
    rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
    
    // Set the consumer's automatic commit interval (in milliseconds), with the default value being 5000.
    rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);
    
    // Enable the consumer's automatic commit, with the default value being true.
    rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);
    
    // Set the consumer's automatic offset reset policy, with the default value being latest.
    rd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0);
    
    // Set client ID, with the default value being rdkafka.
    rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);
    
    // Create a consumer instance.
    char errstr[512];
    rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

    Parameter Description and Optimization

    For automatic offset commit requests, it's recommended not to set auto.commit.interval.ms below 1,000 ms, as too frequent offset requests can cause high broker CPU usage, affecting the read and write operations of other normal services.

    Creating Consumer Instance

    Provide a subscription model for creating consumers, which offers two ways of offset commit: manual submission and automatic submission.

    Auto-Commit Offsets

    Auto-commit offsets: After pulling messages, consumers automatically commit their offsets without manual intervention. The advantage of this method is it's easy to use, but it may lead to duplicate message consumption or loss.
    #include <stdio.h>
    #include <string.h>
    #include <librdkafka/rdkafka.h>
    
    // Consumer message processing callback.
    void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
    fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
    "offset %"PRId64": %s\\n",
    rd_kafka_topic_name(rkmessage->rkt),
    rkmessage->partition, rkmessage->offset,
    rd_kafka_message_errstr(rkmessage));
    } else {
    printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
    rd_kafka_topic_name(rkmessage->rkt),
    rkmessage->partition,
    rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
    }
    }
    
    int main() {
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // Set the addresses of the Kafka cluster.
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // Set consumer group ID.
    rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
    
    // Enable the consumer's automatic offset commit, with the default value being true.
    rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);
    
    // Set the consumer's automatic commit interval (in milliseconds), default is 5000.
    rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);
    
    // Create Consumer Instance.
    char errstr[512];
    rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!consumer) {
    fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
    return 1;
    }
    
    // Subscribe to topics.
    rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
    if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
    fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
    rd_kafka_topic_partition_list_destroy(topics);
    rd_kafka_destroy(consumer);
    return 1;
    }
    rd_kafka_topic_partition_list_destroy(topics);
    
    // Consume messages.
    while (1) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
    if (rkmessage) {
    msg_consume(rkmessage, NULL);
    rd_kafka_message_destroy(rkmessage);
    }
    }
    
    // Unsubscribe.
    rd_kafka_unsubscribe(consumer);
    
    // Destroy consumer instance.
    rd_kafka_destroy(consumer);
    
    return 0;
    }

    Manual-Commit Offsets

    Manual-commit offsets: After processing messages, consumers need to manually commit their offsets. The advantage of this method is that it allows for precise control over offset commit, avoiding duplicate message consumption or loss. Not that manual commit can lead to high broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the broker. Therefore, it is recommended to commit offsets after a certain number of messages.
    #include <stdio.h>
    #include <string.h>
    #include <librdkafka/rdkafka.h>
    
    // Consumer message processing callback.
    void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
    fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
    "offset %"PRId64": %s\\n",
    rd_kafka_topic_name(rkmessage->rkt),
    rkmessage->partition, rkmessage->offset,
    rd_kafka_message_errstr(rkmessage));
    } else {
    printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
    rd_kafka_topic_name(rkmessage->rkt),
    rkmessage->partition,
    rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
    }
    }
    
    int main() {
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // Set the addresses of the Kafka cluster.
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // Set consumer group ID.
    rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
    
    // Disable the consumer's automatic commit.
    rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);
    
    // Create consumer instance.
    char errstr[512];
    rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!consumer) {
    fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
    return 1;
    }
    
    // Subscribe to topics.
    rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
    if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
    fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
    rd_kafka_topic_partition_list_destroy(topics);
    rd_kafka_destroy(consumer);
    return 1;
    }
    rd_kafka_topic_partition_list_destroy(topics);
    
    // Consume messages and manually commit the offset.
    int message_count = 0;
    while (1) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
    if (rkmessage) {
    msg_consume(rkmessage, NULL);
    
    // Manually commit the offset after every 10 messages.
    if (++message_count % 10 == 0) {
    if (rd_kafka_commit_message(consumer, rkmessage, 0) != RD_KAFKA_RESP_ERR_NO_ERROR) {
    fprintf(stderr, "Failed to commit offset for message: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
    } else {
    printf("Offset %"PRId64" committed\\n", rkmessage->offset);
    }
    }
    
    rd_kafka_message_destroy(rkmessage);
    }
    }
    
    // Unsubscribe.
    rd_kafka_unsubscribe(consumer);
    
    // Destroy consumer instance.
    rd_kafka_destroy(consumer);
    
    return 0;
    }
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support