Kafka Version | Sarama Library Version | Sarama Protocol Version Constants |
0.8.2.x | >= 1.0.0 | sarama.V0_8_2_0 |
0.9.0.x | >= 1.0.0 | sarama.V0_9_0_0 |
0.10.0.x | >= 1.0.0 | sarama.V0_10_0_0 |
0.10.1.x | >= 1.0.0 | sarama.V0_10_1_0 |
0.10.2.x | >= 1.0.0 | sarama.V0_10_2_0 |
0.11.0.x | >= 1.16.0 | sarama.V0_11_0_0 |
1.0.x | >= 1.16.0 | sarama.V1_0_0_0 |
1.1.x | >= 1.19.0 | sarama.V1_1_0_0 |
2.0.x | >= 1.19.0 | sarama.V2_0_0_0 |
2.1.x | >= 1.21.0 | sarama.V2_1_0_0 |
2.2.x | >= 1.23.0 | sarama.V2_2_0_0 |
2.3.x | >= 1.24.0 | sarama.V2_3_0_0 |
2.4.x | >= 1.27.0 | sarama.V2_4_0_0 |
2.5.x | >= 1.28.0 | sarama.V2_5_0_0 |
2.6.x | >= 1.29.0 | sarama.V2_6_0_0 |
2.7.x | >= 1.29.0 | sarama.V2_7_0_0 |
2.8.x or later | Recommended >=1.42.1 | sarama.V2_8_0_0-sarama.V3_6_0_0 |
config := sarama.NewConfig()config.Version = sarama.V2_7_0_0 // Set the protocol version according to the actual Kafka version.
config := sarama.NewConfig()sarama.MaxRequestSize = 100 * 1024 * 1024 // Maximum request size, with the default value being 100 MB, adjustable. Writing a message larger than 100 MB will result in an error.sarama.MaxResponseSize = 100 * 1024 * 1024 // Maximum response size, with the default value being 100 MB, adjustable. Accessing a message larger than 100 MB will result in an error.config.Producer.RequiredAcks = sarama.WaitForLocal // The default value is sarama.WaitForLocal(1).config.Producer.Retry.Max = 3 // Maximum retry count for producers, with the default value being 3.config.Producer.Retry.Backoff = 100 * time.Millisecond // Wait time between producer retries, with the default value being 100 milliseconds.config.Producer.Return.Successes = false // Return successful messages, with the default value being false.config.Producer.Return.Errors = true // Return failed messages, with the default value being true.config.Producer.Compression = CompressionNone // Compress messages before sending, with the default value being CompressionNone, i.e., no compression.config.Producer.CompressionLevel = CompressionLevelDefault // Specify compression level, effective after configuration of compression algorithm.config.Producer.Flush.Frequency = 0 //Time for caching messages by the producer, with the default value being 0 milliseconds.config.Producer.Flush.Bytes = 0 // Triggers a broker request when reaching a certain byte size, with the default value being 0. The request is sent immediately, with a natural max limit of MaxRequestSize and default maximum size of 100 MB.config.Producer.Flush.Messages = 0 // Forces a broker request upon reaching a certain number of messages. The number is an upper limit, MaxMessages < Messages.config.Producer.Flush.MaxMessages = 0 // Messages can be cached at maximum, the default value is 0. Messages are sent immediately if available. When MaxMessages is set to greater than 0, Messages must be set, and it is required that: MaxMessages > Messages.config.Producer.Timeout = 5 * time.Second // Timeout duration.config.Producer.Idempotent = false // Whether idempotence is required, with the default value being false.config.Producer.Transaction.Timeout = 1 * time.Minute // Transaction timeout duration, with the default value being 1 minute.config.Producer.Transaction.Retry.Max = 50 // Maximum transaction retry duration.config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecondconfig.Net.MaxOpenRequests = 5 // Default value is 5, the number of requests sent at one time.config.Producer.Transaction.ID = "test" // Transaction ID.config.ClientID = "your-client-id" // Client ID.
config.Producer.Flush.Frequency = 16 // Time for the producer to cache messages, with the default value being 100 milliseconds. If the traffic is small, this duration can be further increased.config.Producer.Flush.Bytes = 16*1024 // Triggers a broker request upon reaching a certain byte size, with the default value being 0. The request is sent immediately and subject to a natural maximum limit of MaxRequestSize, therefore the default maximum size is 100 MB.config.Producer.Flush.Messages = 17 // Forces a broker request upon reaching a certain number of messages. This is an upper limit. MaxMessages should be less than Messages.config.Producer.Flush.MaxMessages = 16 // Set to 16 messages. In fact, as the message size is not strictly 1024 bytes, it is recommended to configure Messages and MaxMessages to larger values or directly use the maximum Int value.// Because hitting any of the Frequency, Bytes, or MaxMessages < Messages conditions will trigger a flush.
config.Producer.Idempotent = true // Whether idempotence is required. It should be set to true in transaction scenarios. config.Producer.Transaction.Timeout = 1 * time.Minute // Default transaction timeout duration is 1 minute config.Producer.Transaction.Retry.Max = 50 // Transaction retry duration. config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond config.Net.MaxOpenRequests = 5 //Number of requests sent at once, with the default value being 5. config.Producer.Transaction.ID = "test" // Transaction ID.
config.Producer.Compression = CompressionNone // Compress messages before sending, with the default value being CompressionNone, i.e., no compression.config.Producer.CompressionLevel = CompressionLevelDefault // Specify the compression level, effective after configuring the compression algorithm.
{"Compression","CompressionLZ4"}
package mainimport ("fmt""log""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForLocalconfig.Producer.Return.Errors = truebrokers := []string{"localhost:9092"}producer, err := sarama.NewSyncProducer(brokers, config)if err != nil {log.Fatalf("Failed to create producer: %v", err)}defer producer.Close()msg := &sarama.ProducerMessage{Topic: "test",Value: sarama.StringEncoder("Hello, World!"),}partition, offset, err := producer.SendMessage(msg)if err != nil {log.Printf("Failed to send message: %v", err)} else {fmt.Printf("Message sent to partition %d at offset %d\\n", partition, offset)}}
package mainimport ("fmt""log""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForLocalconfig.Producer.Return.Errors = truebrokers := []string{"localhost:9092"}producer, err := sarama.NewAsyncProducer(brokers, config)if err != nil {log.Fatalf("Failed to create producer: %v", err)}defer producer.Close()msg := &sarama.ProducerMessage{Topic: "test",Value: sarama.StringEncoder("Hello, World!"),}producer.Input() <- msgselect {case success := <-producer.Successes():fmt.Printf("Message sent to partition %d at offset %d\\n", success.Partition, success.Offset)case err := <-producer.Errors():log.Printf("Failed to send message: %v", err)}}
config := sarama.NewConfig() config.Version = sarama.V2_8_2_0
config := sarama.NewConfig()config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // The default method for consumer partition assignment.config.Consumer.Offsets.Initial = sarama.OffsetNewest // Without a committed offset, use the newest or oldest offset, with the default value being the newest message offset.config.Consumer.Offsets.AutoCommit.Enable = true // Whether auto-commit for offsets is supported, with the default value being support.config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second // Auto-commit interval for offsets, with the default value being 1 s.config.Consumer.MaxWaitTime = 250 * time.Millisecond // Client's waiting time when there are no new consumption messages, with the default value being 250 ms.config.Consumer.MaxProcessingTime = 100 * time.Millisecondconfig.Consumer.Fetch.Min = 1 // Minimum byte size of messages accessed in consumption requests. The Broker will wait for at least this many bytes of messages before returning. Default value is 1, and cannot be set to 0, because 0 would cause consumers to idle when no messages are available.config.Consumer.Fetch.Max = 0 // Maximum byte size for consumption requests. Default is 0, indicating unlimited.config.Consumer.Fetch.Default = 1024 * 1024 // Default byte size of messages for consumption requests, with the default value being 1 MB. It should be larger than most of the instance messages, otherwise, the Broker will spend a lot of time determining whether the consumption data meets this condition.config.Consumer.Return.Errors = trueconfig.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // Set the strategy used by consumer groups during rebalance to NewBalanceStrategyRange, with the default value being NewBalanceStrategyRange.config.Consumer.Group.Rebalance.Timeout = 60 * time.Second // Set the timeout duration for rebalance operations, with the default value being 60 s.config.Consumer.Group.Session.Timeout = 10 * time.Second // Set the timeout duration for consumer group sessions, with the default value being 10 s.config.Consumer.Group.Heartbeat.Interval = 3 * time.Second // Heartbeat timeout duration, with the default value being 3 s.config.Consumer.MaxProcessingTime = 100 * time.Millisecond // Timeout duration for message processing, with the default value being 100 ms.
package mainimport ("context""fmt""log""os""os/signal""sync""time""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Version = sarama.V2_1_0_0config.Consumer.Offsets.Initial = sarama.OffsetOldestconfig.Consumer.Offsets.AutoCommit.Enable = trueconfig.Consumer.Offsets.AutoCommit.Interval = 1 * time.Secondbrokers := []string{"localhost:9092"}topic := "test-topic"client, err := sarama.NewConsumerGroup(brokers, "test-group", config)if err != nil {log.Fatalf("unable to create kafka consumer group: %v", err)}defer client.Close()ctx, cancel := context.WithCancel(context.Background())signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for {err := client.Consume(ctx, []string{topic}, &consumerHandler{})if err != nil {log.Printf("consume error: %v", err)}select {case <-signals:cancel()returndefault:}}}()wg.Wait()}type consumerHandler struct{}func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)sess.MarkMessage(msg, "")}return nil}
package mainimport ("context""fmt""log""os""os/signal""sync""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Version = sarama.V2_1_0_0config.Consumer.Offsets.Initial = sarama.OffsetOldestconfig.Consumer.Offsets.AutoCommit.Enable = falsebrokers := []string{"localhost:9092"}topic := "test-topic"client, err := sarama.NewConsumerGroup(brokers, "test-group", config)if err != nil {log.Fatalf("unable to create kafka consumer group: %v", err)}defer client.Close()ctx, cancel := context.WithCancel(context.Background())signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for {err := client.Consume(ctx, []string{topic}, &consumerHandler{})if err != nil {log.Printf("consume error: %v", err)}select {case <-signals:cancel()returndefault:}}}()wg.Wait()}type consumerHandler struct{}func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)sess.MarkMessage(msg, "")sess.Commit()}return nil}
config := sarama.NewConfig() config.Version = sarama.V2_1_0_0
Was this page helpful?