tencent cloud

文档反馈

Sarama Go

最后更新时间:2024-07-04 16:00:59

    背景

    TDMQ CKafka 是一个分布式流处理平台,用于构建实时数据管道和流式应用程序。它具备高吞吐量、低延迟、可伸缩性和容错性等特性。
    Sarama:Shopify 开发的一个 Kafka 库,提供了生产者、消费者、分区消费者等功能。该库的性能较好,社区支持也较为活跃。
    Confluent-Kafka-Go:由 Confluent 开发的 Kafka 库,提供了高级 API,易于使用。该库基于 librdkafka C 库,性能非常优秀,但安装和使用略显复杂。
    本文着重介绍上述 Sarama Go 客户端的关键参数、实践教程以及常见问题。

    生产者实践

    版本选择

    在选择 Sarama 客户端版本时,需要确保所选版本与 Kafka broker 版本兼容。Sarama 库支持多个 Kafka 协议版本,可以通过设置 config.Version 来指定使用的协议版本。常见的 Kafka 协议版本与 Sarama 库版本的对应关系如下,目前最新版本请参见 Sarama 版本
    Kafka 版本
    Sarama 库版本
    Sarama 协议版本常量
    0.8.2.x
    >= 1.0.0
    sarama.V0_8_2_0
    0.9.0.x
    >= 1.0.0
    sarama.V0_9_0_0
    0.10.0.x
    >= 1.0.0
    sarama.V0_10_0_0
    0.10.1.x
    >= 1.0.0
    sarama.V0_10_1_0
    0.10.2.x
    >= 1.0.0
    sarama.V0_10_2_0
    0.11.0.x
    >= 1.16.0
    sarama.V0_11_0_0
    1.0.x
    >= 1.16.0
    sarama.V1_0_0_0
    1.1.x
    >= 1.19.0
    sarama.V1_1_0_0
    2.0.x
    >= 1.19.0
    sarama.V2_0_0_0
    2.1.x
    >= 1.21.0
    sarama.V2_1_0_0
    2.2.x
    >= 1.23.0
    sarama.V2_2_0_0
    2.3.x
    >= 1.24.0
    sarama.V2_3_0_0
    2.4.x
    >= 1.27.0
    sarama.V2_4_0_0
    2.5.x
    >= 1.28.0
    sarama.V2_5_0_0
    2.6.x
    >= 1.29.0
    sarama.V2_6_0_0
    2.7.x
    >= 1.29.0
    sarama.V2_7_0_0
    2.8.x以及以上
    建议使用>=1.42.1
    sarama.V2_8_0_0-sarama.V3_6_0_0
    上述列出的 Sarama 库版本是支持对应 Kafka 协议版本的最低版本。为了获得最佳性能和使用新功能,建议使用 Sarama 的最新版本。在使用最新版本时,客户可以通过设置 config.Version 来指定与您的 Kafka broker 兼容的协议版本。设置方式如下,务必先设置版本后使用,否则会有预期外的不兼容问题:
    config := sarama.NewConfig()
    config.Version = sarama.V2_7_0_0 // 根据实际Kafka版本设置协议版本

    生产者参数与调优

    生产者参数

    在使用 Sarama go 客户端写入 kafka 时候,需要配置如下关键参数,相关的参数和默认值如下:
    config := sarama.NewConfig()
    sarama.MaxRequestSize = 100 * 1024 * 1024 //请求最大大小,默认100MB,可以调整,写入大于100MB的消息会直接报错
    sarama.MaxResponseSize = 100 * 1024 * 1024 //响应最大大小,默认100MB,可以调整,获取大于100MB的消息会直接报错
    
    config.Producer.RequiredAcks = sarama.WaitForLocal // 默认值为sarama.WaitForLocal(1)
    
    config.Producer.Retry.Max = 3 // 生产者重试的最大次数,默认为3
    config.Producer.Retry.Backoff = 100 * time.Millisecond // 生产者重试之间的等待时间,默认为100毫秒
    
    config.Producer.Return.Successes = false //是否返回成功的消息,默认为false
    config.Producer.Return.Errors = true // 返回失败的消息,默认值为true
    
    config.Producer.Compression = CompressionNone //对消息是否压缩后发送,默认CompressionNone不压缩
    config.Producer.CompressionLevel = CompressionLevelDefault // 指定压缩等级,在配置了压缩算法后生效
    
    config.Producer.Flush.Frequency = 0 //producer缓存消息的时间, 默认缓存0毫秒
    config.Producer.Flush.Bytes = 0 // 达到多少字节时,触发一次broker请求,默认为0,直接发送,存在天然上限值MaxRequestSize,因此默认最大100MB
    config.Producer.Flush.Messages = 0 // 达到多少条消息时,强制,触发一次broker请求,这个是上限值,MaxMessages < Messages
    config.Producer.Flush.MaxMessages = 0 // 最大缓存多少消息,默认为0,有消息立刻发送,MaxMessages设置大于0时,必须设置 Messages,且需要保证:MaxMessages > Messages
    
    config.Producer.Timeout = 5 * time.Second // 超时时间
    
    config.Producer.Idempotent = false //是否需要幂等,默认false
    config.Producer.Transaction.Timeout = 1 * time.Minute // 事务超时时间默认1分钟
    config.Producer.Transaction.Retry.Max = 50 //事务重试时间
    config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
    config.Net.MaxOpenRequests = 5 //默认值5,一次发送请求的数量
    config.Producer.Transaction.ID = "test" //事务ID
    
    config.ClientID = "your-client-id" // 客户端ID

    参数说明调优

    关于 RequiredAcks 参数优化

    RequiredAcks 参数用于控制生产者发送消息时的确认机制。该参数的默认值为 WaitForLocal,表示消息发送给 Leader Broker 后,Leader 确认消息写入后即返回。RequiredAcks 参数还有以下可选值:
    NoResponse: 不等待任何确认,直接返回。
    WaitForLocal: 等待 Leader 副本确认写入后返回。
    WaitForAll: 等待 Leader 副本以及相关的 Follower 副本确认写入后返回。
    由上可知,在跨可用区场景,以及副本数较多的 Topic,RequiredAcks 参数的取值会影响消息的可靠性和吞吐量。因此:
    在一些在线业务消息的场景下,吞吐量要求不大,可以将 RequiredAcks 参数设置为 WaitForAll,则可以确保消息被所有副本接收和确认后才返回,从而提高消息的可靠性。
    在日志采集等大数据或者离线计算的场景下,要求高吞吐(即每秒写入 Kafka 的数据量)的情况下,可以将 RequiredAcks 设置为 WaitForLocal,提高吞吐。

    关于 Flush 参数优化(缓存)

    默认情况下,传输同等数据量的情况下,多次请求和一次请求的网络传输,一次请求传输能有效减少相关计算和网络资源,提高整体写入的吞吐量。因此,可以通过这个参数设置优化客户端发送消息的吞吐能力。在高吞吐场景下,可以配合计算和设置: 其中 Bytes 建议设置为16K,对齐 Kafka 标准 Java SDK 定义, 预估一条消息为1K(1024)个字节,因此得出如下 Messages 和 MaxMessages 的写入参数: 其中 Frequency 的计算方式为:预估流量为 16MB,分区数为16个分区,此时单分区每秒写入流量为:16 * 1024 * 1024 / 16 = 1 * 1024 * 1024 = 1MB,单个分区每秒 1MB 的流量。假设按照 16K 一个请求,数据量发送,那么在 1s 内要实现 1MB 的流量传输 1*1024*1024/16/1024 = 64个请求,因此 Frequency <= 15.62ms(1000/64)。 实际上,由于业务流量不是持续生产的,在低峰期,可能出现即时达到 16ms,也缓存不了太多的数据,因此在高吞吐的情况下,可以将条件简化,以 Bytes 为准,Frequency 可以适当调大,例如能接受 500ms 的延时增加,那么就可以设置为 500ms,因为此时如果命中数据量大于等于 Bytes,会按照 Bytes 的条件发送请求。
    
    config.Producer.Flush.Frequency = 16 //producer缓存消息的时间, 默认缓存100毫秒,如果发送的流量较小,这里可以进一步增加延时时间。
    config.Producer.Flush.Bytes = 16*1024 // 达到多少字节时,触发一次broker请求,默认为0,直接发送,存在天然上限值MaxRequestSize,因此默认最大100MB
    config.Producer.Flush.Messages = 17 // 达到多少条消息时,强制,触发一次broker请求,这个是上限值,MaxMessages 需要小于 Messages
    config.Producer.Flush.MaxMessages = 16 // 16条,实际上因为消息大小不严格1024字节,Messages和MaxMessages 建议配置值更大或者直接使用Int的最大值,
    //因为命中Frequency,Bytes,MaxMessages < Messages任何一个条件都会触发flush

    关于事务参数优化

    config.Producer.Idempotent = true //是否需要幂等,在事务场景下需要设置为true config.Producer.Transaction.Timeout = 1 * time.Minute // 事务超时时间默认1分钟 config.Producer.Transaction.Retry.Max = 50 //事务重试时间 config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond config.Net.MaxOpenRequests = 5 //默认值5,一次发送请求的数量 config.Producer.Transaction.ID = "test" //事务ID
    需要强调,事务因为要保障消息的 exactly once 语义,因此会额外付出更多的计算资源,所以 config.Net.MaxOpenRequests 的选取必须小于等于5,Broker 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个 batch 数据,如果客户在事务的基础上还需要保持一定的吞吐,因此可以设置该值为5,同时适当增加事务超时时间,容忍高负载下一些网络抖动带来的时延问题。

    关于压缩参数优化

    Sarama Go 支持如下压缩参数:
    config.Producer.Compression = CompressionNone //对消息是否压缩后发送,默认CompressionNone不压缩
    config.Producer.CompressionLevel = CompressionLevelDefault //指定压缩等级,在配置了压缩算法后生效
    在Sarama Kafka Go客户端中,支持以下几种压缩配置: 1. sarama.CompressionNone:不使用压缩。 2. sarama.CompressionGZIP:使用 GZIP 压缩。 3. sarama.CompressionSnappy:使用 Snappy 压缩。 4. sarama.CompressionLZ4:使用 LZ4 压缩。 5. sarama.CompressionZSTD:使用 ZSTD 压缩。 要在 Sarama Kafka Go 客户端中使用压缩消息,需要在创建生产者时设置 config.Producer.Compression 参数。例如,要使用 LZ4 压缩算法,可以将config.Producer.Compression 设置为 sarama.CompressionLZ4 ,虽然压缩消息的压缩和解压缩,发生客户端,是一种用计算换带宽的优化方式,但是由于Broker 针对压缩消息存在校验行为会付出额外的计算成本,尤其是 gzip 压缩,Broker 对其校验计算成本会比较大,在某种程度上可能会出现得不偿失的情况,反而因为计算的增加导致Broker消息处理能力偏低,导致带宽吞吐更低。在低吞吐或者低规格服务下,不建议使用压缩消息。如果还是需要压缩消息,这种情况建议可以使用如下方式进行使用:
    1. 在 Producer 端对消息数据独立压缩,生成压缩包数据:messageCompression,同时在消息的 key 存储压缩方式:
    {"Compression","CompressionLZ4"}
    2. 在Producer端将messageCompression当成正常消息发送。
    3. 在 Consumer 端读取消息key,获取使用的压缩方式,独立进行解压缩。

    创建生产者实例

    如果应用程序需要更高的吞吐量,则可以使用异步生产者,以提高消息的发送速度。同时,可以使用批量发送消息的方式,以减少网络开销和 IO 消耗。如果应用程序需要更高的可靠性,则可以使用同步生产者,以确保消息发送成功。同时,可以使用 ACK 确认机制和事务机制,以确保消息的可靠性和一致性。具体的参数调优参考生产者参数与调优。

    同步生产者

    在 Sarama Kafka Go 客户端中,有两种类型的生产者:同步生产者和异步生产者。它们的主要区别在于发送消息的方式和处理消息结果的方式。同步生产者:同步生产者在发送消息时会阻塞当前线程,直到消息发送完成并收到服务器的确认。因此,同步生产者的吞吐量较低,但是可以立即知道消息是否发送成功。示例如下:
    package main
    
    import (
    "fmt"
    "log"
    
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal
    config.Producer.Return.Errors = true
    
    brokers := []string{"localhost:9092"}
    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
    log.Fatalf("Failed to create producer: %v", err)
    }
    defer producer.Close()
    
    msg := &sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.StringEncoder("Hello, World!"),
    }
    
    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
    log.Printf("Failed to send message: %v", err)
    } else {
    fmt.Printf("Message sent to partition %d at offset %d\\n", partition, offset)
    }
    }

    异步生产者

    异步生产者:异步生产者在发送消息时不会阻塞当前线程,而是将消息放入一个内部的发送队列,然后立即返回。因此,异步生产者的吞吐量较高,但是需要通过回调函数来处理消息结果。
    package main
    
    import (
    "fmt"
    "log"
    
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal
    config.Producer.Return.Errors = true
    
    brokers := []string{"localhost:9092"}
    producer, err := sarama.NewAsyncProducer(brokers, config)
    if err != nil {
    log.Fatalf("Failed to create producer: %v", err)
    }
    defer producer.Close()
    
    msg := &sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.StringEncoder("Hello, World!"),
    }
    
    producer.Input() <- msg
    
    select {
    case success := <-producer.Successes():
    fmt.Printf("Message sent to partition %d at offset %d\\n", success.Partition, success.Offset)
    case err := <-producer.Errors():
    log.Printf("Failed to send message: %v", err)
    }
    }

    消费者实践

    版本选择

    在选择 Sarama 客户端版本时,需要确保所选版本与 Kafka broker 版本兼容。Sarama 库支持多个 Kafka 协议版本,可以通过设置 config.Version 来指定使用的协议版本。
    config := sarama.NewConfig() config.Version = sarama.V2_8_2_0

    消费者参数与调优

    
    config := sarama.NewConfig()
    config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange //消费者分配分区的默认方式
    config.Consumer.Offsets.Initial = sarama.OffsetNewest //在没有提交位点情况下,使用最新的位点还是最老的位点,默认是最新的消息位点
    config.Consumer.Offsets.AutoCommit.Enable = true //是否支持自动提交位点,默认支持
    config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second //自动提交位点时间间隔,默认1s
    config.Consumer.MaxWaitTime = 250 * time.Millisecond //在没有最新消费消息时候,客户端等待的时间,默认250ms
    config.Consumer.MaxProcessingTime = 100 * time.Millisecond
    config.Consumer.Fetch.Min = 1 //消费请求中获取的最小消息字节数,Broker将等待至少这么多字节的消息然后返回。默认值为1,不能设置0,因为0会导致在没有消息可用时消费者空转。
    config.Consumer.Fetch.Max = 0 //消费请求最大的字节数。默认为0,表示不限制
    config.Consumer.Fetch.Default = 1024 * 1024 //消费请求的默认消息字节数(默认为1MB),需要大于实例的大部分消息,否则Broker会花费大量时间计算消费数据是否达到这个值的条件
    config.Consumer.Return.Errors = true
    
    config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // 设置消费者组在进行rebalance时所使用的策略为NewBalanceStrategyRange,默认NewBalanceStrategyRange
    config.Consumer.Group.Rebalance.Timeout = 60 * time.Second // 设置rebalance操作的超时时间,默认60s
    config.Consumer.Group.Session.Timeout = 10 * time.Second // 设置消费者组会话的超时时间为,默认为10s
    config.Consumer.Group.Heartbeat.Interval = 3 * time.Second // 心跳超时时间,默认为3s
    config.Consumer.MaxProcessingTime = 100 * time.Millisecond //消息处理的超时时间,默认100ms,

    参数说明与调优

    一般消费主要是rebalance时间频繁和消费线程阻塞问题,参考以下说明参数优化:
    config.Consumer.Group.Session.Timeout:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s 即可。
    config.Consumer.Group.Heartbeat.Interval:默认3s,设置该值 需要小于Consumer.Group.Session.Timeout/3。
    config.Consumer.Group.Rebalance.Timeout:默认60s,如果分区数和消费者较多,建议适当调大该值。
    config.Consumer.MaxProcessingTime:该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。
    注意:
    根据需求调大 MaxProcessingTime 时间。
    针对处理时间大于 MaxProcessingTime 请求处理时间进行监控,采样打印超时时间。

    创建消费者实例

    Sarama 提供订阅的模型创建消费者,其中在提交位点方面,提供手动提交位点和自动提交位点两种方式。

    自动提交位点

    自动提交位点:消费者在拉取消息后会自动提交位点,无需手动操作。这种方式的优点是简单易用,但是可能会导致消息重复消费或丢失。
    package main
    
    import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"
    
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    config.Version = sarama.V2_1_0_0
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    config.Consumer.Offsets.AutoCommit.Enable = true
    config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
    
    brokers := []string{"localhost:9092"}
    topic := "test-topic"
    
    client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
    if err != nil {
    log.Fatalf("unable to create kafka consumer group: %v", err)
    }
    defer client.Close()
    
    ctx, cancel := context.WithCancel(context.Background())
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    
    var wg sync.WaitGroup
    wg.Add(1)
    
    go func() {
    defer wg.Done()
    
    for {
    err := client.Consume(ctx, []string{topic}, &consumerHandler{})
    if err != nil {
    log.Printf("consume error: %v", err)
    }
    
    select {
    case <-signals:
    cancel()
    return
    default:
    }
    }
    }()
    
    wg.Wait()
    }
    
    type consumerHandler struct{}
    
    func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
    }
    
    func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
    }
    
    func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
    fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
    sess.MarkMessage(msg, "")
    }
    return nil
    }

    手动提交位点

    手动提交位点:消费者在处理完消息后需要手动提交位点。这种方式的优点是可以精确控制位点的提交,避免消息重复消费或丢失。但是需要注意,手动提交位点如果太频繁会导致 Broker CPU 很高,影响性能,随着消息量增加,CPU 消费会很高,影响正常 Broker 的其他功能,因此建议间隔一定消息提交位点。
    package main
    
    import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    config.Version = sarama.V2_1_0_0
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    config.Consumer.Offsets.AutoCommit.Enable = false
    
    brokers := []string{"localhost:9092"}
    topic := "test-topic"
    
    client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
    if err != nil {
    log.Fatalf("unable to create kafka consumer group: %v", err)
    }
    defer client.Close()
    
    ctx, cancel := context.WithCancel(context.Background())
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    
    var wg sync.WaitGroup
    wg.Add(1)
    
    go func() {
    defer wg.Done()
    
    for {
    err := client.Consume(ctx, []string{topic}, &consumerHandler{})
    if err != nil {
    log.Printf("consume error: %v", err)
    }
    
    select {
    case <-signals:
    cancel()
    return
    default:
    }
    }
    }()
    
    wg.Wait()
    }
    
    type consumerHandler struct{}
    
    func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
    }
    
    func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
    }
    
    func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
    fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
    sess.MarkMessage(msg, "")
    sess.Commit()
    }
    return nil
    }

    Sarama Go 生产消费常见问题

    1. 配置了手动提交位点,但是位点在控制台查询消费组时候没有原因。
    无论配置了手动提交位点还是自动提交位点,都需要先进行标记,sess.MarkMessage(msg, ""),表示该消息已经被消费完,然后才能提交位点。
    2. Sarama Go 作为消费者的一些问题,Sarama Go 版本客户端存在以下已知问题:
    2.1 当Topic新增分区时,Sarama Go客户端无法感知并消费新增分区,需要客户端重启后,才能消费到新增分区。
    2.2 当Sarama Go客户端同时订阅两个以上的Topic时,有可能会导致部分分区无法正常消费消息。
    2.3 当Sarama Go客户端的消费位点重置策略设置为Oldest(earliest)时,如果客户端宕机或服务端版本升级,由于Sarama Go客户端自行实现OutOfRange机制,有可能会导致客户端从最小位点开始重新消费所有消息。
    2.4 对于该问题:Confluent Go客户端的Demo地址,请访问 kafka-confluent-go-demo
    3. 出现报错:Failed to produce message to topic。
    原因可能为版本没有对齐,此时客户先确定kafka Broker的版本,然后指定版本:
    config := sarama.NewConfig() config.Version = sarama.V2_1_0_0
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持