tencent cloud

文档反馈

tRpc Go SDK

最后更新时间:2024-09-10 16:08:59

    背景

    TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
    本文着重介绍 tRpc-Go-Kafka 客户端的关键参数和实践教程,以及常见问题。

    调优实践

    tRPC-GO-Kafka 封装开源 Kakfa SDK,利用 tRPC-Go 拦截器等功能,接入 tRPC-Go 生态。因此实践教程参见 Sarama Go

    常见问题

    生产者问题

    1. 使用 CKafka 生产消息时,出现错误 Message contents does not match its CRC
    err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
    插件默认启用了 gzip 压缩,在 target 上加上参数 compression=none 关闭压缩即可。
    target: kafka://ip1:port1?compression=none
    2. 生产时同一用户需要有序,如何配置?
    客户端增加参数 partitioner,可选 random(默认),roundrobin,hash(按 key 分区)。
    target: kafka://ip1:port1?clientid=xxx&partitioner=hash
    3. 如何异步生产?
    客户端增加参数 async=1
    target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
    4. 如何使用异步生产写数据回调?
    需要在代码中重写异步生产写数据的成功/失败的回调函数,例如:
    func init() {
    // 重写默认的异步生产写数据错误回调
    kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
    // do something if async producer occurred error.
    }
    
    // 重写默认的异步生产写数据成功回调
    kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
    // do something if async producer succeed.
    }
    }

    消费者问题

    1. 如果消费时 Handle 返回了非 nil 会发生什么?
    会休眠 3s 后重新消费,不建议这么做,因为返回错误会导致无限重试消费,失败的应该由业务做重试逻辑。
    2. 使用 ckafka 消费消息时,出现错误 client has run out of available brokers to talk to
    kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
    优先检查 brokers 是否可达,然后检查支持的 kafka 客户端版本,尝试在配置文件 address 中加上参数例如 version=0.10.2.0
    address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
    3. 当多个生产者生产时,部分生产者建立连接失败会影响正常的生产者超时?
    请更新至 v0.2.18。 低版本插件在创建生产者时先加锁,再建立连接,建立连接结束后释放锁。如果存在一部分异常生产者建立连接耗时很长,就会导致其他正常生产请求在获取生产者的时候加锁失败,最终超时。此行为在 v0.2.18 已经修复。
    4. 消费消息时,提示 The provider group protocol type is incompatible with the other members
    kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
    同一消费者组的客户端重分组策略不一样,可修改参数 strategy,可选:sticky(默认),range,roundrobin。
    address: ip1:port1?topics=topic12&group=my-group&strategy=range
    5. 如何注入自定义配置(远端配置)?
    如果需要代码中指定配置,先在trpc_go.yaml中配置 fake_address,然后配合 kafka.RegisterAddrConfig 方法注入,trpc_go.yaml配置如下:
    address: fake_address
    在服务启动前,注入自定义配置:
    func main() {
    s := trpc.NewServer()
    // 使用自定义 addr,需在启动 server 前注入
    cfg := kafka.GetDefaultConfig()
    cfg.Brokers = []string{"127.0.0.1:9092"}
    cfg.Topics = []string{"test_topic"}
    kafka.RegisterAddrConfig("fake_address", cfg)
    kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})
    
    s.Serve()
    }
    6. 如何获取底层 sarama 的上下文信息?
    通过 kafka.GetRawSaramaContext 可以获取底层 sarama ConsumerGroupSessionConsumerGroupClaim。但是此处暴露这两个接口只是方便用户做监控日志,应该只使用其读方法,调用任何写方法在这里都是未定义行为,可能造成未知结果。
    // RawSaramaContext 存放 sarama ConsumerGroupSession 和 ConsumerGroupClaim
    // 导出此结构体是为了方便用户实现监控,提供的内容仅用于读,调用任何写方法属于未定义行为
    type RawSaramaContext struct {
    Session sarama.ConsumerGroupSession
    Claim sarama.ConsumerGroupClaim
    }
    使用实例:
    func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
    if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
    log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
    }
    // ...
    return nil
    }
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持