tencent cloud

文档反馈

ibrdkafka SDK

最后更新时间:2024-07-04 16:00:59

    背景

    TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
    本文着重介绍上述 librdkafka 客户端的关键参数和实践教程,以及常见问题。

    生产者实践

    版本选择

    在使用 librdkafka 时,librdkafka 会自动根据 Kafka 集群的版本选择适当的协议版本进行通信,由于 kafka 的版本迭代更新较快,通常情况下,使用最新的 librdkafka 版本可以获得更好的兼容性和性能。

    生产者参数与调优

    生产者参数

    librdkafka 主要涉及如下关键参数,相关的参数和默认值如下:
    
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // Kafka集群的地址,多个地址用逗号分隔,默认为空
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // 发送消息的最大尝试次数,包括第一次尝试,默认为2
    rd_kafka_conf_set(conf, "message.send.max.retries", "2", NULL, 0);
    
    // 重试之间的回退时间(以毫秒为单位),默认为100
    rd_kafka_conf_set(conf, "retry.backoff.ms", "100", NULL, 0);
    
    // 客户端请求超时时间(以毫秒为单位),默认为5000
    rd_kafka_conf_set(conf, "request.timeout.ms", "5000", NULL, 0);
    
    // 客户端发送缓冲区大小(以字节为单位),默认为131072
    rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "131072", NULL, 0);
    
    // 客户端发送缓冲区中消息的最大数量,默认为100000
    rd_kafka_conf_set(conf, "queue.buffering.max.messages", "100000", NULL, 0);
    
    // 客户端发送缓冲区中消息的最大总大小(以字节为单位),默认为1000000
    rd_kafka_conf_set(conf, "queue.buffering.max.total.bytes", "1000000", NULL, 0);
    
    // 客户端发送缓冲区的linger时间(以毫秒为单位),默认为0
    rd_kafka_conf_set(conf, "queue.buffering.max.ms", "0", NULL, 0);
    
    // 是否启用消息压缩,默认为0(不启用)
    rd_kafka_conf_set(conf, "compression.codec", "none", NULL, 0);
    
    // 消息压缩级别,默认为0(自动选择)
    rd_kafka_conf_set(conf, "compression.level", "0", NULL, 0);
    
    // 客户端的ID,默认为rdkafka
    rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);
    
    // 生产者的最大并发请求数,即未收到broker响应的请求数,默认为1000000
    rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "1000000", NULL, 0);
    
    // 客户端与Kafka集群的连接最大重试次数,默认为3次
    rd_kafka_conf_set(conf, "broker.address.ttl", "3", NULL, 0);
    
    // 客户端与Kafka集群的连接重试间隔(以毫秒为单位),默认为1000
    rd_kafka_conf_set(conf, "reconnect.backoff.ms", "1000", NULL, 0);
    
    // 客户端与Kafka集群的连接重试最大间隔(以毫秒为单位),默认为10000
    rd_kafka_conf_set(conf, "reconnect.backoff.max.ms", "10000", NULL, 0);
    
    // 客户端API版本的回退时间(以毫秒为单位),默认为10000
    rd_kafka_conf_set(conf, "api.version.request.timeout.ms", "10000", NULL, 0);
    
    // 安全协议,默认为plaintext
    rd_kafka_conf_set(conf, "security.protocol", "plaintext", NULL, 0);
    
    // 其他SSL和SASL相关参数,请参考librdkafka官方文档
    
    // 创建生产者实例
    rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);

    参数说明调优

    关于 acks 参数优化
    acks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为-1,表示消息发送给 Leader Broker 后,Leader 确认以及相应的 Follower 消息都写入完成后才返回。acks 参数还有以下可选值:0,1,-1。在跨可用区场景,以及副本数较多的 Topic,acks 参数的取值会影响消息的可靠性和吞吐量。因此:
    在一些在线业务消息的场景下,吞吐量要求不大,可以将 acks 参数设置为-1,则可以确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性。
    在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 acks 设置为1,提高吞吐。
    关于 buffering 参数优化(缓存)
    默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。
    因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。对 librdkafka,默认提供5ms的攒批时间积攒消息。如果消息较小,可以适当增加queue.buffering.max.ms 的时间。
    关于压缩参数优化
    librdkafka 支持如下压缩参数:none, gzip, snappy, lz4, zstd。
    在 librdkafka客户端中,支持以下几种压缩算法:
    none:不使用压缩算法。
    gzip:使用 GZIP 压缩算法。
    snappy:使用 Snappy 压缩算法。
    lz4:使用 LZ4 压缩算法。
    zstd:使用 ZSTD 压缩算法。
    要在 Producer 客户端中使用压缩算法,需要在创建生产者时设置 compression.type 参数。例如,要使用 LZ4 压缩算法,可以将 compression.type 设置为 lz4,虽然压缩算法的 CPU 压缩和 CPU解压缩,发生客户端,是一种用计算换带宽的优化方式,但是由于 Broker 针对压缩消息存在校验行为会付出额外的计算成本,尤其是 Gzip 压缩,服务端的压缩计算成本会比较大,在某种程度上可能会出现得不偿失的情况,反而因为计算的增加导致 Broker 消息处理能力偏低,导致带宽吞吐更低。这种情况建议可以使用如下方式进行使用:
    在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
    {"Compression","CompressionLZ4"}
    在 Producer 端将 messageCompression 当成正常消息发送。
    在 Consumer 端读取消息 key,获取使用的压缩方式,独立进行解压缩。

    创建生产者实例

    如果应用程序需要更高的吞吐量,则可以使用异步生产者,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。如果应用程序需要更高的可靠性,则可以使用同步生产者,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。
    #include <stdio.h>
    #include <string.h>
    #include <librdkafka/rdkafka.h>
    
    // 生产者消息发送回调
    void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
    fprintf(stderr, "Message delivery failed: %s\\n", rd_kafka_err2str(rkmessage->err));
    } else {
    fprintf(stderr, "Message delivered (%zd bytes, partition %"PRId32")\\n",
    rkmessage->len, rkmessage->partition);
    }
    }
    
    int main() {
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // 设置Kafka集群的地址
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // 设置ack等于1,表示leader副本收到消息后即认为发送成功
    rd_kafka_conf_set(conf, "acks", "1", NULL, 0);
    
    // 设置生产者消息发送回调
    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
    
    // 创建生产者实例
    char errstr[512];
    rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!producer) {
    fprintf(stderr, "Failed to create producer: %s\\n", errstr);
    return 1;
    }
    
    // 创建主题实例
    rd_kafka_topic_t *topic = rd_kafka_topic_new(producer, "test", NULL);
    if (!topic) {
    fprintf(stderr, "Failed to create topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
    rd_kafka_destroy(producer);
    return 1;
    }
    
    // 发送消息
    const char *message = "Hello, Kafka!";
    if (rd_kafka_produce(
    topic,
    RD_KAFKA_PARTITION_UA,
    RD_KAFKA_MSG_F_COPY,
    (void *)message,
    strlen(message),
    NULL,
    0,
    NULL) == -1) {
    fprintf(stderr, "Failed to produce to topic %s: %s\\n", rd_kafka_topic_name(topic), rd_kafka_err2str(rd_kafka_last_error()));
    }
    
    // 等待所有消息发送完成
    while (rd_kafka_outq_len(producer) > 0) {
    rd_kafka_poll(producer, 1000);
    }
    
    // 销毁主题实例
    rd_kafka_topic_destroy(topic);
    
    // 销毁生产者实例
    rd_kafka_destroy(producer);
    
    return 0;
    }

    消费者实践

    消费者参数与调优

    消费者参数

    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // 设置Kafka集群的地址
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // 设置消费组ID,默认为空
    rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
    
    // 设置消费者的自动提交间隔(以毫秒为单位),默认为5000
    rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);
    
    // 设置消费者的自动提交开关,默认为true
    rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);
    
    // 设置消费者的自动偏移量重置策略,默认为latest
    rd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0);
    
    // 设置客户端的ID,默认为rdkafka
    rd_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);
    
    // 创建消费者实例
    char errstr[512];
    rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

    参数说明与调优

    针对自动提交位点请求,建议 auto.commit.interval.ms 时间不要低于1000ms,因为频率过高的位点请求会导致 Broker CPU 很高,影响其他正常服务的读写。

    创建消费者实例

    提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

    自动提交位点

    自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。
    #include <stdio.h>
    #include <string.h>
    #include <librdkafka/rdkafka.h>
    
    // 消费者消息处理回调
    void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
    fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
    "offset %"PRId64": %s\\n",
    rd_kafka_topic_name(rkmessage->rkt),
    rkmessage->partition, rkmessage->offset,
    rd_kafka_message_errstr(rkmessage));
    } else {
    printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
    rd_kafka_topic_name(rkmessage->rkt),
    rkmessage->partition,
    rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
    }
    }
    
    int main() {
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // 设置Kafka集群的地址
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // 设置消费组ID
    rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
    
    // 设置消费者的自动提交开关,默认为true
    rd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);
    
    // 设置消费者的自动提交间隔(以毫秒为单位),默认为5000
    rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);
    
    // 创建消费者实例
    char errstr[512];
    rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!consumer) {
    fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
    return 1;
    }
    
    // 订阅主题
    rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
    if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
    fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
    rd_kafka_topic_partition_list_destroy(topics);
    rd_kafka_destroy(consumer);
    return 1;
    }
    rd_kafka_topic_partition_list_destroy(topics);
    
    // 消费消息
    while (1) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
    if (rkmessage) {
    msg_consume(rkmessage, NULL);
    rd_kafka_message_destroy(rkmessage);
    }
    }
    
    // 取消订阅
    rd_kafka_unsubscribe(consumer);
    
    // 销毁消费者实例
    rd_kafka_destroy(consumer);
    
    return 0;
    }

    手动提交位点

    手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。
    #include <stdio.h>
    #include <string.h>
    #include <librdkafka/rdkafka.h>
    
    // 消费者消息处理回调
    void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {
    if (rkmessage->err) {
    fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] "
    "offset %"PRId64": %s\\n",
    rd_kafka_topic_name(rkmessage->rkt),
    rkmessage->partition, rkmessage->offset,
    rd_kafka_message_errstr(rkmessage));
    } else {
    printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",
    rd_kafka_topic_name(rkmessage->rkt),
    rkmessage->partition,
    rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);
    }
    }
    
    int main() {
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    
    // 设置Kafka集群的地址
    rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);
    
    // 设置消费组ID
    rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);
    
    // 关闭消费者的自动提交开关
    rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);
    
    // 创建消费者实例
    char errstr[512];
    rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
    if (!consumer) {
    fprintf(stderr, "Failed to create consumer: %s\\n", errstr);
    return 1;
    }
    
    // 订阅主题
    rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);
    if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {
    fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
    rd_kafka_topic_partition_list_destroy(topics);
    rd_kafka_destroy(consumer);
    return 1;
    }
    rd_kafka_topic_partition_list_destroy(topics);
    
    // 消费消息并手动提交位点
    int message_count = 0;
    while (1) {
    rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);
    if (rkmessage) {
    msg_consume(rkmessage, NULL);
    
    // 每隔10条消息手动提交位点
    if (++message_count % 10 == 0) {
    if (rd_kafka_commit_message(consumer, rkmessage, 0) != RD_KAFKA_RESP_ERR_NO_ERROR) {
    fprintf(stderr, "Failed to commit offset for message: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));
    } else {
    printf("Offset %"PRId64" committed\\n", rkmessage->offset);
    }
    }
    
    rd_kafka_message_destroy(rkmessage);
    }
    }
    
    // 取消订阅
    rd_kafka_unsubscribe(consumer);
    
    // 销毁消费者实例
    rd_kafka_destroy(consumer);
    
    return 0;
    }
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持