rd_kafka_conf_t *conf = rd_kafka_conf_new();// Kafka集群的地址,多个地址用逗号分隔,默认为空rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// 发送消息的最大尝试次数,包括第一次尝试,默认为2rd_kafka_conf_set(conf, "message.send.max.retries", "2", NULL, 0);// 重试之间的回退时间(以毫秒为单位),默认为100rd_kafka_conf_set(conf, "retry.backoff.ms", "100", NULL, 0);// 客户端请求超时时间(以毫秒为单位),默认为5000rd_kafka_conf_set(conf, "request.timeout.ms", "5000", NULL, 0);// 客户端发送缓冲区大小(以字节为单位),默认为131072rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "131072", NULL, 0);// 客户端发送缓冲区中消息的最大数量,默认为100000rd_kafka_conf_set(conf, "queue.buffering.max.messages", "100000", NULL, 0);// 客户端发送缓冲区中消息的最大总大小(以字节为单位),默认为1000000rd_kafka_conf_set(conf, "queue.buffering.max.total.bytes", "1000000", NULL, 0);// 客户端发送缓冲区的linger时间(以毫秒为单位),默认为0rd_kafka_conf_set(conf, "queue.buffering.max.ms", "0", NULL, 0);// 是否启用消息压缩,默认为0(不启用)rd_kafka_conf_set(conf, "compression.codec", "none", NULL, 0);// 消息压缩级别,默认为0(自动选择)rd_kafka_conf_set(conf, "compression.level", "0", NULL, 0);// 客户端的ID,默认为rdkafkard_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);// 生产者的最大并发请求数,即未收到broker响应的请求数,默认为1000000rd_kafka_conf_set(conf, "max.in.flight.requests.per.connection", "1000000", NULL, 0);// 客户端与Kafka集群的连接最大重试次数,默认为3次rd_kafka_conf_set(conf, "broker.address.ttl", "3", NULL, 0);// 客户端与Kafka集群的连接重试间隔(以毫秒为单位),默认为1000rd_kafka_conf_set(conf, "reconnect.backoff.ms", "1000", NULL, 0);// 客户端与Kafka集群的连接重试最大间隔(以毫秒为单位),默认为10000rd_kafka_conf_set(conf, "reconnect.backoff.max.ms", "10000", NULL, 0);// 客户端API版本的回退时间(以毫秒为单位),默认为10000rd_kafka_conf_set(conf, "api.version.request.timeout.ms", "10000", NULL, 0);// 安全协议,默认为plaintextrd_kafka_conf_set(conf, "security.protocol", "plaintext", NULL, 0);// 其他SSL和SASL相关参数,请参考librdkafka官方文档// 创建生产者实例rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, NULL, 0);
{"Compression","CompressionLZ4"}
#include <stdio.h>#include <string.h>#include <librdkafka/rdkafka.h>// 生产者消息发送回调void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stderr, "Message delivery failed: %s\\n", rd_kafka_err2str(rkmessage->err));} else {fprintf(stderr, "Message delivered (%zd bytes, partition %"PRId32")\\n",rkmessage->len, rkmessage->partition);}}int main() {rd_kafka_conf_t *conf = rd_kafka_conf_new();// 设置Kafka集群的地址rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// 设置ack等于1,表示leader副本收到消息后即认为发送成功rd_kafka_conf_set(conf, "acks", "1", NULL, 0);// 设置生产者消息发送回调rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);// 创建生产者实例char errstr[512];rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if (!producer) {fprintf(stderr, "Failed to create producer: %s\\n", errstr);return 1;}// 创建主题实例rd_kafka_topic_t *topic = rd_kafka_topic_new(producer, "test", NULL);if (!topic) {fprintf(stderr, "Failed to create topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));rd_kafka_destroy(producer);return 1;}// 发送消息const char *message = "Hello, Kafka!";if (rd_kafka_produce(topic,RD_KAFKA_PARTITION_UA,RD_KAFKA_MSG_F_COPY,(void *)message,strlen(message),NULL,0,NULL) == -1) {fprintf(stderr, "Failed to produce to topic %s: %s\\n", rd_kafka_topic_name(topic), rd_kafka_err2str(rd_kafka_last_error()));}// 等待所有消息发送完成while (rd_kafka_outq_len(producer) > 0) {rd_kafka_poll(producer, 1000);}// 销毁主题实例rd_kafka_topic_destroy(topic);// 销毁生产者实例rd_kafka_destroy(producer);return 0;}
rd_kafka_conf_t *conf = rd_kafka_conf_new();// 设置Kafka集群的地址rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// 设置消费组ID,默认为空rd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);// 设置消费者的自动提交间隔(以毫秒为单位),默认为5000rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);// 设置消费者的自动提交开关,默认为truerd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);// 设置消费者的自动偏移量重置策略,默认为latestrd_kafka_conf_set(conf, "auto.offset.reset", "latest", NULL, 0);// 设置客户端的ID,默认为rdkafkard_kafka_conf_set(conf, "client.id", "rdkafka", NULL, 0);// 创建消费者实例char errstr[512];rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
#include <stdio.h>#include <string.h>#include <librdkafka/rdkafka.h>// 消费者消息处理回调void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] ""offset %"PRId64": %s\\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition, rkmessage->offset,rd_kafka_message_errstr(rkmessage));} else {printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition,rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);}}int main() {rd_kafka_conf_t *conf = rd_kafka_conf_new();// 设置Kafka集群的地址rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// 设置消费组IDrd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);// 设置消费者的自动提交开关,默认为truerd_kafka_conf_set(conf, "enable.auto.commit", "true", NULL, 0);// 设置消费者的自动提交间隔(以毫秒为单位),默认为5000rd_kafka_conf_set(conf, "auto.commit.interval.ms", "5000", NULL, 0);// 创建消费者实例char errstr[512];rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if (!consumer) {fprintf(stderr, "Failed to create consumer: %s\\n", errstr);return 1;}// 订阅主题rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));rd_kafka_topic_partition_list_destroy(topics);rd_kafka_destroy(consumer);return 1;}rd_kafka_topic_partition_list_destroy(topics);// 消费消息while (1) {rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);if (rkmessage) {msg_consume(rkmessage, NULL);rd_kafka_message_destroy(rkmessage);}}// 取消订阅rd_kafka_unsubscribe(consumer);// 销毁消费者实例rd_kafka_destroy(consumer);return 0;}
#include <stdio.h>#include <string.h>#include <librdkafka/rdkafka.h>// 消费者消息处理回调void msg_consume(rd_kafka_message_t *rkmessage, void *opaque) {if (rkmessage->err) {fprintf(stderr, "%% Consume error for topic \\"%s\\" [%"PRId32"] ""offset %"PRId64": %s\\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition, rkmessage->offset,rd_kafka_message_errstr(rkmessage));} else {printf("%% Message received on topic %s [%"PRId32"] at offset %"PRId64": %.*s\\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition,rkmessage->offset, (int)rkmessage->len, (const char *)rkmessage->payload);}}int main() {rd_kafka_conf_t *conf = rd_kafka_conf_new();// 设置Kafka集群的地址rd_kafka_conf_set(conf, "bootstrap.servers", "localhost:9092", NULL, 0);// 设置消费组IDrd_kafka_conf_set(conf, "group.id", "mygroup", NULL, 0);// 关闭消费者的自动提交开关rd_kafka_conf_set(conf, "enable.auto.commit", "false", NULL, 0);// 创建消费者实例char errstr[512];rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if (!consumer) {fprintf(stderr, "Failed to create consumer: %s\\n", errstr);return 1;}// 订阅主题rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(topics, "test", RD_KAFKA_PARTITION_UA);if (rd_kafka_subscribe(consumer, topics) != RD_KAFKA_RESP_ERR_NO_ERROR) {fprintf(stderr, "Failed to subscribe to topic: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));rd_kafka_topic_partition_list_destroy(topics);rd_kafka_destroy(consumer);return 1;}rd_kafka_topic_partition_list_destroy(topics);// 消费消息并手动提交位点int message_count = 0;while (1) {rd_kafka_message_t *rkmessage = rd_kafka_consumer_poll(consumer, 1000);if (rkmessage) {msg_consume(rkmessage, NULL);// 每隔10条消息手动提交位点if (++message_count % 10 == 0) {if (rd_kafka_commit_message(consumer, rkmessage, 0) != RD_KAFKA_RESP_ERR_NO_ERROR) {fprintf(stderr, "Failed to commit offset for message: %s\\n", rd_kafka_err2str(rd_kafka_last_error()));} else {printf("Offset %"PRId64" committed\\n", rkmessage->offset);}}rd_kafka_message_destroy(rkmessage);}}// 取消订阅rd_kafka_unsubscribe(consumer);// 销毁消费者实例rd_kafka_destroy(consumer);return 0;}
本页内容是否解决了您的问题?