tencent cloud

Feedback

Sarama Go

Last updated: 2024-07-04 16:00:59

    Overview

    TDMQ for CKafka is a distributed stream processing platform designed for building real-time data pipelines and streaming applications. It boasts high throughput, low latency, scalability, and fault tolerance.
    Sarama: A Kafka library developed by Shopify, offering features such as producers, consumers, and partition consumers. The library performs well, and has an active community support.
    Confluent-Kafka-Go: A Kafka library developed by Confluent, featuring high-level APIs that are easy to use. Based on the librdkafka C library, its performance is excellent, though it can be somewhat complex to install and use.
    This document mainly describes key parameters, practical tutorials, and FAQs of the aforementioned Sarama Go client.

    Producer Practices

    Version Selection

    When the Sarama client version is selected, it is necessary to ensure that the selected version is compatible with the Kafka broker version. The Sarama library supports multiple Kafka protocol versions. Specify the protocol version to use by setting config.Version. The common correspondence between Kafka protocol versions and Sarama library versions is as follows. For the latest version, see Sarama Version.
    Kafka Version
    Sarama Library Version
    Sarama Protocol Version Constants
    0.8.2.x
    >= 1.0.0
    sarama.V0_8_2_0
    0.9.0.x
    >= 1.0.0
    sarama.V0_9_0_0
    0.10.0.x
    >= 1.0.0
    sarama.V0_10_0_0
    0.10.1.x
    >= 1.0.0
    sarama.V0_10_1_0
    0.10.2.x
    >= 1.0.0
    sarama.V0_10_2_0
    0.11.0.x
    >= 1.16.0
    sarama.V0_11_0_0
    1.0.x
    >= 1.16.0
    sarama.V1_0_0_0
    1.1.x
    >= 1.19.0
    sarama.V1_1_0_0
    2.0.x
    >= 1.19.0
    sarama.V2_0_0_0
    2.1.x
    >= 1.21.0
    sarama.V2_1_0_0
    2.2.x
    >= 1.23.0
    sarama.V2_2_0_0
    2.3.x
    >= 1.24.0
    sarama.V2_3_0_0
    2.4.x
    >= 1.27.0
    sarama.V2_4_0_0
    2.5.x
    >= 1.28.0
    sarama.V2_5_0_0
    2.6.x
    >= 1.29.0
    sarama.V2_6_0_0
    2.7.x
    >= 1.29.0
    sarama.V2_7_0_0
    2.8.x or later
    Recommended >=1.42.1
    sarama.V2_8_0_0-sarama.V3_6_0_0
    The Sarama library versions listed above represent the minimum versions that support the corresponding Kafka protocol versions. For optimal performance and new features, it is recommended to use the latest version of Sarama. When using the latest version, clients can specify the protocol version compatible with your Kafka broker by setting config.Version. The setting method is as follows, be sure to set the version before use, otherwise, there will be unexpected incompatibility issues:
    config := sarama.NewConfig()
    config.Version = sarama.V2_7_0_0 // Set the protocol version according to the actual Kafka version.

    Producer Parameters and Optimization

    Producer Parameters

    When using the Sarama Go client to write to Kafka, you need to configure the following key parameters. Parameters and their default values are as follows:
    
    config := sarama.NewConfig()
    sarama.MaxRequestSize = 100 * 1024 * 1024 // Maximum request size, with the default value being 100 MB, adjustable. Writing a message larger than 100 MB will result in an error.
    sarama.MaxResponseSize = 100 * 1024 * 1024 // Maximum response size, with the default value being 100 MB, adjustable. Accessing a message larger than 100 MB will result in an error.
    
    config.Producer.RequiredAcks = sarama.WaitForLocal // The default value is sarama.WaitForLocal(1).
    
    config.Producer.Retry.Max = 3 // Maximum retry count for producers, with the default value being 3.
    config.Producer.Retry.Backoff = 100 * time.Millisecond // Wait time between producer retries, with the default value being 100 milliseconds.
    
    config.Producer.Return.Successes = false // Return successful messages, with the default value being false.
    config.Producer.Return.Errors = true // Return failed messages, with the default value being true.
    
    config.Producer.Compression = CompressionNone // Compress messages before sending, with the default value being CompressionNone, i.e., no compression.
    config.Producer.CompressionLevel = CompressionLevelDefault // Specify compression level, effective after configuration of compression algorithm.
    
    config.Producer.Flush.Frequency = 0 //Time for caching messages by the producer, with the default value being 0 milliseconds.
    config.Producer.Flush.Bytes = 0 // Triggers a broker request when reaching a certain byte size, with the default value being 0. The request is sent immediately, with a natural max limit of MaxRequestSize and default maximum size of 100 MB.
    config.Producer.Flush.Messages = 0 // Forces a broker request upon reaching a certain number of messages. The number is an upper limit, MaxMessages < Messages.
    config.Producer.Flush.MaxMessages = 0 // Messages can be cached at maximum, the default value is 0. Messages are sent immediately if available. When MaxMessages is set to greater than 0, Messages must be set, and it is required that: MaxMessages > Messages.
    
    config.Producer.Timeout = 5 * time.Second // Timeout duration.
    
    config.Producer.Idempotent = false // Whether idempotence is required, with the default value being false.
    config.Producer.Transaction.Timeout = 1 * time.Minute // Transaction timeout duration, with the default value being 1 minute.
    config.Producer.Transaction.Retry.Max = 50 // Maximum transaction retry duration.
    config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond
    config.Net.MaxOpenRequests = 5 // Default value is 5, the number of requests sent at one time.
    config.Producer.Transaction.ID = "test" // Transaction ID.
    
    config.ClientID = "your-client-id" // Client ID.

    Parameter Description and Optimization

    RequiredAcks Parameter Optimization

    The RequiredAcks parameter is used to control the acknowledgement mechanism when the producer sends messages. The default value is WaitForLocal, which means that once the message is sent to the Leader Broker and the Leader confirms the message has been written, it is returned immediately. The RequiredAcks parameter also has the following optional values:
    NoResponse: Without waiting for any confirmation, return directly.
    WaitForLocal: Wait for the Leader replica to confirm the write, then return.
    WaitForAll: Wait for the Leader replica and the relevant Follower replicas to confirm the write, then return.
    From the above, it's clear that in scenarios involving cross-availability zones, as well as in Topic with a higher number of replicas, the choice of RequiredAcks parameter value can affect the reliability and throughput of messages. Therefore:
    In scenarios involving online business messages, where throughput demands are not high, you can set the RequiredAcks parameter to WaitForAll to ensure that messages are received and confirmed by all replicas before returning and to increase reliability.
    In scenarios of big data, like log collection, or offline computing, where a high throughput (i.e., data written to Kafka per second) is required, you can set the RequiredAcks to WaitForLocal to enhance throughput.

    Flush Parameter Optimization (Caching)

    By default, when the same amount of data is transmitted, compared to multiple requests, a single request can effectively reduce computations and network resources, thus improving the overall write throughput. Therefore, you can optimize the client's message-sending throughput by setting this parameter. In high-throughput scenarios, set parameters in conjunction with computation: Bytes are recommended to be set at 16 K, aligned with Kafka's standard Java SDK definition, and estimated single message size is 1 K (1024) bytes, hence the following parameters for Messages and MaxMessages should be set: The calculation method for Frequency is as follows: estimated traffic is 16 MB, with 16 partitions, then per partition write traffic per second would be: 16 x 1024 x 1024 / 16 = 1 x 1024 x 1024 = 1 MB, i.e. 1 MB of traffic per second per partition. Assuming a request size is 16 K for data transmission, to achieve 1 MB of traffic transmission in 1 s, it requires 1x1024x1024/16/1024 = 64 requests, thus Frequency <= 15.62 ms(1000/64). In reality, since the business traffic is not continuously produced, during off-peak hours, it might still hit 16ms but not cache much data. Therefore, in high-throughput situations, the conditions can be simplified, based on Bytes, and Frequency can be appropriately increased. For example, if a delay increase of 500 ms is acceptable, then it can be set to 500 ms, because at this time, if the data volume hits greater than or equal to Bytes, requests will be sent based on the Bytes condition.
    
    config.Producer.Flush.Frequency = 16 // Time for the producer to cache messages, with the default value being 100 milliseconds. If the traffic is small, this duration can be further increased.
    config.Producer.Flush.Bytes = 16*1024 // Triggers a broker request upon reaching a certain byte size, with the default value being 0. The request is sent immediately and subject to a natural maximum limit of MaxRequestSize, therefore the default maximum size is 100 MB.
    config.Producer.Flush.Messages = 17 // Forces a broker request upon reaching a certain number of messages. This is an upper limit. MaxMessages should be less than Messages.
    config.Producer.Flush.MaxMessages = 16 // Set to 16 messages. In fact, as the message size is not strictly 1024 bytes, it is recommended to configure Messages and MaxMessages to larger values or directly use the maximum Int value.
    // Because hitting any of the Frequency, Bytes, or MaxMessages < Messages conditions will trigger a flush.

    Transaction Parameter Optimization

    config.Producer.Idempotent = true // Whether idempotence is required. It should be set to true in transaction scenarios. config.Producer.Transaction.Timeout = 1 * time.Minute // Default transaction timeout duration is 1 minute config.Producer.Transaction.Retry.Max = 50 // Transaction retry duration. config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond config.Net.MaxOpenRequests = 5 //Number of requests sent at once, with the default value being 5. config.Producer.Transaction.ID = "test" // Transaction ID.
    It is important to note that ensuring the exactly once semantics for transactions requires additional computational resources. Therefore, the selection of config.Net.MaxOpenRequests must be less than or equal to 5. The Broker's ProducerStateManager instances will cache the most recent 5 batch data sent by each PID on each Topic-Partition. If the customer wants to maintain a certain level of throughput in addition to transactions, set this value to 5 and appropriately increase the transaction timeout period to accommodate delays caused by network jitter under high load.

    Compression Parameter Optimization

    Sarama Go supports the following compression parameters:
    config.Producer.Compression = CompressionNone // Compress messages before sending, with the default value being CompressionNone, i.e., no compression.
    config.Producer.CompressionLevel = CompressionLevelDefault // Specify the compression level, effective after configuring the compression algorithm.
    In the Sarama Kafka Go client, the following compression configurations are supported: 1. sarama.CompressionNone: Do not compress. 2. sarama.CompressionGZIP: Compress by GZIP. 3. sarama.CompressionSnappy: Compress by Snappy. 4. sarama.CompressionLZ4: Compress by LZ4. 5. sarama.CompressionZSTD: Compress by ZSTD. To use message compression in the Sarama Kafka Go client, you should set the config.Producer.Compression parameter when creating a producer. For example, to use the LZ4 compression algorithm, set config.Producer.Compression to sarama.CompressionLZ4. As an optimization method that exchanges computation for bandwidth, message compression and decompression occurs on the client side. However, due to additional computation cost of the Broker's behavior in verifying compressed messages, especially with GZIP compression, the verification computation cost for the Broker can be significant, which is not worthwhile in some cases. Due to increased computation, message processing capabilities of the Broker may be lower, resulting in lower bandwidth throughput. Under circumstances of low throughput or low specification services, it is not recommended to use message compression. If compression is necessary, the following method is recommended:
    1. Independently compress message data on the Producer side to generate packed data compression: messageCompression, and store the compression method in the message's key:
    {"Compression","CompressionLZ4"}
    2. On the Producer side, send messageCompression as a normal message.
    3. On the Consumer side, read the message key to access the compression method used, and decompress independently.

    Creating Producer Instance

    If an application requires higher throughput, use an asynchronous producer to increase the speed of message sending. At the same time, send batch messages to reduce network overhead and IO consumption. If an application demands higher reliability, a synchronous producer can be used to ensure successful message delivery. Additionally, employ the ACK acknowledgement mechanism and transaction mechanism to guarantee message reliability and consistency. For specific parameter optimization, see Producer Parameters and Optimization.

    Synchronous Producer

    In the Sarama Kafka Go client, there are two types of producers: synchronous and asynchronous. Their main differences lie in the method of sending messages and processing message results. Synchronous Producer: The synchronous producer blocks the current thread when sending messages, until the message has been sent and acknowledged by the server. As a result, the throughput of a synchronous producer is lower, but it allows immediate knowledge of whether the message was successfully sent. Example:
    package main
    
    import (
    "fmt"
    "log"
    
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal
    config.Producer.Return.Errors = true
    
    brokers := []string{"localhost:9092"}
    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
    log.Fatalf("Failed to create producer: %v", err)
    }
    defer producer.Close()
    
    msg := &sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.StringEncoder("Hello, World!"),
    }
    
    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
    log.Printf("Failed to send message: %v", err)
    } else {
    fmt.Printf("Message sent to partition %d at offset %d\\n", partition, offset)
    }
    }

    Asynchronous Producer

    Asynchronous Producer: The asynchronous producer does not block the current thread when sending messages. Instead, it places the message into an internal sending queue and then returns immediately. Therefore, the throughput of an asynchronous producer is higher, but it requires a callback function to process the message results.
    package main
    
    import (
    "fmt"
    "log"
    
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForLocal
    config.Producer.Return.Errors = true
    
    brokers := []string{"localhost:9092"}
    producer, err := sarama.NewAsyncProducer(brokers, config)
    if err != nil {
    log.Fatalf("Failed to create producer: %v", err)
    }
    defer producer.Close()
    
    msg := &sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.StringEncoder("Hello, World!"),
    }
    
    producer.Input() <- msg
    
    select {
    case success := <-producer.Successes():
    fmt.Printf("Message sent to partition %d at offset %d\\n", success.Partition, success.Offset)
    case err := <-producer.Errors():
    log.Printf("Failed to send message: %v", err)
    }
    }

    Consumer Practice

    Version Selection

    When a Sarama client version is selected, it is necessary to ensure that the chosen version is compatible with the Kafka broker version. The Sarama library supports multiple Kafka protocol versions, which can be specified by setting config.Version.
    config := sarama.NewConfig() config.Version = sarama.V2_8_2_0

    Consumer Parameters and Optimization

    
    config := sarama.NewConfig()
    config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // The default method for consumer partition assignment.
    config.Consumer.Offsets.Initial = sarama.OffsetNewest // Without a committed offset, use the newest or oldest offset, with the default value being the newest message offset.
    config.Consumer.Offsets.AutoCommit.Enable = true // Whether auto-commit for offsets is supported, with the default value being support.
    config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // Auto-commit interval for offsets, with the default value being 1 s.
    config.Consumer.MaxWaitTime = 250 * time.Millisecond // Client's waiting time when there are no new consumption messages, with the default value being 250 ms.
    config.Consumer.MaxProcessingTime = 100 * time.Millisecond
    config.Consumer.Fetch.Min = 1 // Minimum byte size of messages accessed in consumption requests. The Broker will wait for at least this many bytes of messages before returning. Default value is 1, and cannot be set to 0, because 0 would cause consumers to idle when no messages are available.
    config.Consumer.Fetch.Max = 0 // Maximum byte size for consumption requests. Default is 0, indicating unlimited.
    config.Consumer.Fetch.Default = 1024 * 1024 // Default byte size of messages for consumption requests, with the default value being 1 MB. It should be larger than most of the instance messages, otherwise, the Broker will spend a lot of time determining whether the consumption data meets this condition.
    config.Consumer.Return.Errors = true
    
    config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // Set the strategy used by consumer groups during rebalance to NewBalanceStrategyRange, with the default value being NewBalanceStrategyRange.
    config.Consumer.Group.Rebalance.Timeout = 60 * time.Second // Set the timeout duration for rebalance operations, with the default value being 60 s.
    config.Consumer.Group.Session.Timeout = 10 * time.Second // Set the timeout duration for consumer group sessions, with the default value being 10 s.
    config.Consumer.Group.Heartbeat.Interval = 3 * time.Second // Heartbeat timeout duration, with the default value being 3 s.
    config.Consumer.MaxProcessingTime = 100 * time.Millisecond // Timeout duration for message processing, with the default value being 100 ms.

    Parameter Description and Optimization

    General consumption issues mainly involve frequent rebalance times and consumption thread blocking. See the following for parameter optimization:
    config.Consumer.Group.Session.Timeout: For versions earlier than v0.10.2, increase this parameter to an appropriate value, making it greater than the time taken to consume a batch of data and not exceed 30 s. It is recommended to set it to 25 s. For v0.10.2 and later versions, use the default value of 10 s.
    config.Consumer.Group.Heartbeat.Interval: Default is 3 s. Set this value and make sure it is less than Consumer.Group.Session.Timeout/3.
    config.Consumer.Group.Rebalance.Timeout: Default is 60 s. If the number of partitions and consumers is large, it is recommended to appropriately increase this value.
    config.Consumer.MaxProcessingTime: This value should be greater than the <max.poll.records> / (<number of records consumed per second per thread> x <number of consumer threads>).
    Note:
    Increase the MaxProcessingTime according to your needs.
    Monitor for processing times that exceed the MaxProcessingTime, and log the instances of timeouts.

    Creating Consumer Instance

    Sarama offers a subscription model to create consumers. It provides two ways to commit offsets: manually and automatically.

    Auto-Commit Offsets

    Auto-commit offsets: After consumers pull messages, they automatically commit their offsets without manual intervention. The advantage of this method is it is easy to use, but it may lead to duplicate message consumption or loss.
    package main
    
    import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"
    
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    config.Version = sarama.V2_1_0_0
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    config.Consumer.Offsets.AutoCommit.Enable = true
    config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second
    
    brokers := []string{"localhost:9092"}
    topic := "test-topic"
    
    client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
    if err != nil {
    log.Fatalf("unable to create kafka consumer group: %v", err)
    }
    defer client.Close()
    
    ctx, cancel := context.WithCancel(context.Background())
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    
    var wg sync.WaitGroup
    wg.Add(1)
    
    go func() {
    defer wg.Done()
    
    for {
    err := client.Consume(ctx, []string{topic}, &consumerHandler{})
    if err != nil {
    log.Printf("consume error: %v", err)
    }
    
    select {
    case <-signals:
    cancel()
    return
    default:
    }
    }
    }()
    
    wg.Wait()
    }
    
    type consumerHandler struct{}
    
    func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
    }
    
    func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
    }
    
    func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
    fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
    sess.MarkMessage(msg, "")
    }
    return nil
    }

    Manual-Commit Offsets

    Manually committing offsets: After processing messages, consumers need to manually commit their offsets. The advantage of this method is that it allows for precise control over offset committing, avoiding duplicate message consumption or loss. However, it should be noted that manual committing can lead to high Broker CPU usage, affecting performance. As message volume increases, CPU consumption will be significantly high, affecting other features of the Broker. Therefore, it is recommended to commit offsets after a certain number of messages.
    package main
    
    import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "sync"
    
    "github.com/Shopify/sarama"
    )
    
    func main() {
    config := sarama.NewConfig()
    config.Version = sarama.V2_1_0_0
    config.Consumer.Offsets.Initial = sarama.OffsetOldest
    config.Consumer.Offsets.AutoCommit.Enable = false
    
    brokers := []string{"localhost:9092"}
    topic := "test-topic"
    
    client, err := sarama.NewConsumerGroup(brokers, "test-group", config)
    if err != nil {
    log.Fatalf("unable to create kafka consumer group: %v", err)
    }
    defer client.Close()
    
    ctx, cancel := context.WithCancel(context.Background())
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    
    var wg sync.WaitGroup
    wg.Add(1)
    
    go func() {
    defer wg.Done()
    
    for {
    err := client.Consume(ctx, []string{topic}, &consumerHandler{})
    if err != nil {
    log.Printf("consume error: %v", err)
    }
    
    select {
    case <-signals:
    cancel()
    return
    default:
    }
    }
    }()
    
    wg.Wait()
    }
    
    type consumerHandler struct{}
    
    func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {
    return nil
    }
    
    func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
    }
    
    func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
    fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)
    sess.MarkMessage(msg, "")
    sess.Commit()
    }
    return nil
    }

    Production and Consumption FAQs with Sarama Go

    1. Configured manually-commit offset, but the offset doesn't show up in the console when querying the consumption group.
    Whether you've configured manual-commit or auto-commit offset, you first need to mark the message, sess.MarkMessage(msg, ""), indicating that the message has been fully consumed, before committing the offset.
    2. Some issues with Sarama Go as a consumer exist and the Sarama Go version client has the following known issues:
    2.1 When a Topic adds a partition, the Sarama Go client cannot detect and consume the new partition. The client must be restarted to consume the newly added partition.
    2.2 When the Sarama Go client subscribes to more than two Topics at the same time, it may lead to some partitions being unable to consume messages normally.
    2.3 When the resetting strategy of Sarama Go client's consumption offset is set to Oldest(earliest), if the client experiences downtime or there is a server-side version upgrade, due to the OutOfRange mechanism implemented by the Sarama Go client itself, it may cause the client to start re-consuming all messages from the smallest offset.
    2.4 Regarding this issue, for the Confluent Go client's demo address, see kafka-confluent-go-demo.
    3. Error message: Failed to produce message to topic.
    The issue may be caused by misaligned versions. In this case, the client should first check the version of the Kafka Broker, and then specify the version:
    config := sarama.NewConfig() config.Version = sarama.V2_1_0_0
    
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support