背景
TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它提供了高吞吐量、低延迟、可伸缩性和容错性等特性。
本文着重介绍 tRpc-Go-Kafka 客户端的关键参数和实践教程,以及常见问题。
调优实践
tRPC-GO-Kafka 封装开源 Kakfa SDK,利用 tRPC-Go 拦截器等功能,接入 tRPC-Go 生态。因此实践教程参见 Sarama Go: 常见问题
生产者问题
1. 使用 CKafka 生产消息时,出现错误 Message contents does not match its CRC
。
err:type:framework, code:141, msg:kafka client transport SendMessage: kafka server: Message contents does not match its CRC.
插件默认启用了 gzip 压缩,在 target 上加上参数 compression=none
关闭压缩即可。
target: kafka://ip1:port1?compression=none
2. 生产时同一用户需要有序,如何配置?
客户端增加参数 partitioner
,可选 random(默认),roundrobin,hash(按 key 分区)。
target: kafka://ip1:port1?clientid=xxx&partitioner=hash
3. 如何异步生产?
客户端增加参数 async=1
target: kafka://ip1:port1,ip2:port2?clientid=xxx&async=1
4. 如何使用异步生产写数据回调?
需要在代码中重写异步生产写数据的成功/失败的回调函数,例如:
func init() {
kafka.AsyncProducerErrorCallback = func(err error, topic string, key, value []byte, headers []sarama.RecordHeader) {
}
kafka.AsyncProducerSuccCallback = func(topic string, key, value []byte, headers []sarama.RecordHeader) {
}
}
消费者问题
1. 如果消费时 Handle 返回了非 nil 会发生什么?
会休眠 3s 后重新消费,不建议这么做,因为返回错误会导致无限重试消费,失败的应该由业务做重试逻辑。
2. 使用 ckafka 消费消息时,出现错误 client has run out of available brokers to talk to
。
kafka server transport: consume fail:kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
优先检查 brokers 是否可达,然后检查支持的 kafka 客户端版本,尝试在配置文件 address 中加上参数例如 version=0.10.2.0
address: ip1:port1?topics=topic1&group=my-group&version=0.10.2.0
3. 当多个生产者生产时,部分生产者建立连接失败会影响正常的生产者超时?
请更新至 v0.2.18。 低版本插件在创建生产者时先加锁,再建立连接,建立连接结束后释放锁。如果存在一部分异常生产者建立连接耗时很长,就会导致其他正常生产请求在获取生产者的时候加锁失败,最终超时。此行为在 v0.2.18 已经修复。
4. 消费消息时,提示 The provider group protocol type is incompatible with the other members
。
kafka server transport: consume fail:kafka server: The provider group protocol type is incompatible with the other members.
同一消费者组的客户端重分组策略不一样,可修改参数 strategy
,可选:sticky(默认),range,roundrobin。
address: ip1:port1?topics=topic12&group=my-group&strategy=range
5. 如何注入自定义配置(远端配置)?
如果需要代码中指定配置,先在trpc_go.yaml
中配置 fake_address
,然后配合 kafka.RegisterAddrConfig
方法注入,trpc_go.yaml
配置如下:
在服务启动前,注入自定义配置:
func main() {
s := trpc.NewServer()
cfg := kafka.GetDefaultConfig()
cfg.Brokers = []string{"127.0.0.1:9092"}
cfg.Topics = []string{"test_topic"}
kafka.RegisterAddrConfig("fake_address", cfg)
kafka.RegisterKafkaConsumerService(s.Service("fake_address"), &Consumer{})
s.Serve()
}
6. 如何获取底层 sarama 的上下文信息?
通过 kafka
.
GetRawSaramaContext
可以获取底层 sarama ConsumerGroupSession
和 ConsumerGroupClaim
。但是此处暴露这两个接口只是方便用户做监控日志,应该只使用其读方法,调用任何写方法在这里都是未定义行为,可能造成未知结果。
type RawSaramaContext struct {
Session sarama.ConsumerGroupSession
Claim sarama.ConsumerGroupClaim
}
使用实例:
func (Consumer) Handle(ctx context.Context, msg *sarama.ConsumerMessage) error {
if rawContext, ok := kafka.GetRawSaramaContext(ctx); ok {
log.Infof("InitialOffset: %d", rawContext.Claim.InitialOffset())
}
return nil
}
本页内容是否解决了您的问题?