Kafka 版本 | Sarama 库版本 | Sarama 协议版本常量 |
0.8.2.x | >= 1.0.0 | sarama.V0_8_2_0 |
0.9.0.x | >= 1.0.0 | sarama.V0_9_0_0 |
0.10.0.x | >= 1.0.0 | sarama.V0_10_0_0 |
0.10.1.x | >= 1.0.0 | sarama.V0_10_1_0 |
0.10.2.x | >= 1.0.0 | sarama.V0_10_2_0 |
0.11.0.x | >= 1.16.0 | sarama.V0_11_0_0 |
1.0.x | >= 1.16.0 | sarama.V1_0_0_0 |
1.1.x | >= 1.19.0 | sarama.V1_1_0_0 |
2.0.x | >= 1.19.0 | sarama.V2_0_0_0 |
2.1.x | >= 1.21.0 | sarama.V2_1_0_0 |
2.2.x | >= 1.23.0 | sarama.V2_2_0_0 |
2.3.x | >= 1.24.0 | sarama.V2_3_0_0 |
2.4.x | >= 1.27.0 | sarama.V2_4_0_0 |
2.5.x | >= 1.28.0 | sarama.V2_5_0_0 |
2.6.x | >= 1.29.0 | sarama.V2_6_0_0 |
2.7.x | >= 1.29.0 | sarama.V2_7_0_0 |
2.8.x以及以上 | 建议使用>=1.42.1 | sarama.V2_8_0_0-sarama.V3_6_0_0 |
config := sarama.NewConfig()config.Version = sarama.V2_7_0_0 // 根据实际Kafka版本设置协议版本
config := sarama.NewConfig()sarama.MaxRequestSize = 100 * 1024 * 1024 //请求最大大小,默认100MB,可以调整,写入大于100MB的消息会直接报错sarama.MaxResponseSize = 100 * 1024 * 1024 //响应最大大小,默认100MB,可以调整,获取大于100MB的消息会直接报错config.Producer.RequiredAcks = sarama.WaitForLocal // 默认值为sarama.WaitForLocal(1)config.Producer.Retry.Max = 3 // 生产者重试的最大次数,默认为3config.Producer.Retry.Backoff = 100 * time.Millisecond // 生产者重试之间的等待时间,默认为100毫秒config.Producer.Return.Successes = false //是否返回成功的消息,默认为falseconfig.Producer.Return.Errors = true // 返回失败的消息,默认值为trueconfig.Producer.Compression = CompressionNone //对消息是否压缩后发送,默认CompressionNone不压缩config.Producer.CompressionLevel = CompressionLevelDefault // 指定压缩等级,在配置了压缩算法后生效config.Producer.Flush.Frequency = 0 //producer缓存消息的时间, 默认缓存0毫秒config.Producer.Flush.Bytes = 0 // 达到多少字节时,触发一次broker请求,默认为0,直接发送,存在天然上限值MaxRequestSize,因此默认最大100MBconfig.Producer.Flush.Messages = 0 // 达到多少条消息时,强制,触发一次broker请求,这个是上限值,MaxMessages < Messagesconfig.Producer.Flush.MaxMessages = 0 // 最大缓存多少消息,默认为0,有消息立刻发送,MaxMessages设置大于0时,必须设置 Messages,且需要保证:MaxMessages > Messagesconfig.Producer.Timeout = 5 * time.Second // 超时时间config.Producer.Idempotent = false //是否需要幂等,默认falseconfig.Producer.Transaction.Timeout = 1 * time.Minute // 事务超时时间默认1分钟config.Producer.Transaction.Retry.Max = 50 //事务重试时间config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecondconfig.Net.MaxOpenRequests = 5 //默认值5,一次发送请求的数量config.Producer.Transaction.ID = "test" //事务IDconfig.ClientID = "your-client-id" // 客户端ID
config.Producer.Flush.Frequency = 16 //producer缓存消息的时间, 默认缓存100毫秒,如果发送的流量较小,这里可以进一步增加延时时间。config.Producer.Flush.Bytes = 16*1024 // 达到多少字节时,触发一次broker请求,默认为0,直接发送,存在天然上限值MaxRequestSize,因此默认最大100MBconfig.Producer.Flush.Messages = 17 // 达到多少条消息时,强制,触发一次broker请求,这个是上限值,MaxMessages 需要小于 Messagesconfig.Producer.Flush.MaxMessages = 16 // 16条,实际上因为消息大小不严格1024字节,Messages和MaxMessages 建议配置值更大或者直接使用Int的最大值,//因为命中Frequency,Bytes,MaxMessages < Messages任何一个条件都会触发flush
config.Producer.Idempotent = true //是否需要幂等,在事务场景下需要设置为true config.Producer.Transaction.Timeout = 1 * time.Minute // 事务超时时间默认1分钟 config.Producer.Transaction.Retry.Max = 50 //事务重试时间 config.Producer.Transaction.Retry.Backoff = 100 * time.Millisecond config.Net.MaxOpenRequests = 5 //默认值5,一次发送请求的数量 config.Producer.Transaction.ID = "test" //事务ID
config.Producer.Compression = CompressionNone //对消息是否压缩后发送,默认CompressionNone不压缩config.Producer.CompressionLevel = CompressionLevelDefault //指定压缩等级,在配置了压缩算法后生效
{"Compression","CompressionLZ4"}
package mainimport ("fmt""log""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForLocalconfig.Producer.Return.Errors = truebrokers := []string{"localhost:9092"}producer, err := sarama.NewSyncProducer(brokers, config)if err != nil {log.Fatalf("Failed to create producer: %v", err)}defer producer.Close()msg := &sarama.ProducerMessage{Topic: "test",Value: sarama.StringEncoder("Hello, World!"),}partition, offset, err := producer.SendMessage(msg)if err != nil {log.Printf("Failed to send message: %v", err)} else {fmt.Printf("Message sent to partition %d at offset %d\\n", partition, offset)}}
package mainimport ("fmt""log""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForLocalconfig.Producer.Return.Errors = truebrokers := []string{"localhost:9092"}producer, err := sarama.NewAsyncProducer(brokers, config)if err != nil {log.Fatalf("Failed to create producer: %v", err)}defer producer.Close()msg := &sarama.ProducerMessage{Topic: "test",Value: sarama.StringEncoder("Hello, World!"),}producer.Input() <- msgselect {case success := <-producer.Successes():fmt.Printf("Message sent to partition %d at offset %d\\n", success.Partition, success.Offset)case err := <-producer.Errors():log.Printf("Failed to send message: %v", err)}}
config := sarama.NewConfig() config.Version = sarama.V2_8_2_0
config := sarama.NewConfig()config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange //消费者分配分区的默认方式config.Consumer.Offsets.Initial = sarama.OffsetNewest //在没有提交位点情况下,使用最新的位点还是最老的位点,默认是最新的消息位点config.Consumer.Offsets.AutoCommit.Enable = true //是否支持自动提交位点,默认支持config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second //自动提交位点时间间隔,默认1sconfig.Consumer.MaxWaitTime = 250 * time.Millisecond //在没有最新消费消息时候,客户端等待的时间,默认250msconfig.Consumer.MaxProcessingTime = 100 * time.Millisecondconfig.Consumer.Fetch.Min = 1 //消费请求中获取的最小消息字节数,Broker将等待至少这么多字节的消息然后返回。默认值为1,不能设置0,因为0会导致在没有消息可用时消费者空转。config.Consumer.Fetch.Max = 0 //消费请求最大的字节数。默认为0,表示不限制config.Consumer.Fetch.Default = 1024 * 1024 //消费请求的默认消息字节数(默认为1MB),需要大于实例的大部分消息,否则Broker会花费大量时间计算消费数据是否达到这个值的条件config.Consumer.Return.Errors = trueconfig.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange // 设置消费者组在进行rebalance时所使用的策略为NewBalanceStrategyRange,默认NewBalanceStrategyRangeconfig.Consumer.Group.Rebalance.Timeout = 60 * time.Second // 设置rebalance操作的超时时间,默认60sconfig.Consumer.Group.Session.Timeout = 10 * time.Second // 设置消费者组会话的超时时间为,默认为10sconfig.Consumer.Group.Heartbeat.Interval = 3 * time.Second // 心跳超时时间,默认为3sconfig.Consumer.MaxProcessingTime = 100 * time.Millisecond //消息处理的超时时间,默认100ms,
package mainimport ("context""fmt""log""os""os/signal""sync""time""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Version = sarama.V2_1_0_0config.Consumer.Offsets.Initial = sarama.OffsetOldestconfig.Consumer.Offsets.AutoCommit.Enable = trueconfig.Consumer.Offsets.AutoCommit.Interval = 1 * time.Secondbrokers := []string{"localhost:9092"}topic := "test-topic"client, err := sarama.NewConsumerGroup(brokers, "test-group", config)if err != nil {log.Fatalf("unable to create kafka consumer group: %v", err)}defer client.Close()ctx, cancel := context.WithCancel(context.Background())signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for {err := client.Consume(ctx, []string{topic}, &consumerHandler{})if err != nil {log.Printf("consume error: %v", err)}select {case <-signals:cancel()returndefault:}}}()wg.Wait()}type consumerHandler struct{}func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)sess.MarkMessage(msg, "")}return nil}
package mainimport ("context""fmt""log""os""os/signal""sync""github.com/Shopify/sarama")func main() {config := sarama.NewConfig()config.Version = sarama.V2_1_0_0config.Consumer.Offsets.Initial = sarama.OffsetOldestconfig.Consumer.Offsets.AutoCommit.Enable = falsebrokers := []string{"localhost:9092"}topic := "test-topic"client, err := sarama.NewConsumerGroup(brokers, "test-group", config)if err != nil {log.Fatalf("unable to create kafka consumer group: %v", err)}defer client.Close()ctx, cancel := context.WithCancel(context.Background())signals := make(chan os.Signal, 1)signal.Notify(signals, os.Interrupt)var wg sync.WaitGroupwg.Add(1)go func() {defer wg.Done()for {err := client.Consume(ctx, []string{topic}, &consumerHandler{})if err != nil {log.Printf("consume error: %v", err)}select {case <-signals:cancel()returndefault:}}}()wg.Wait()}type consumerHandler struct{}func (h *consumerHandler) Setup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) Cleanup(sarama.ConsumerGroupSession) error {return nil}func (h *consumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Received message: key=%s, value=%s, partition=%d, offset=%d\\n", string(msg.Key), string(msg.Value), msg.Partition, msg.Offset)sess.MarkMessage(msg, "")sess.Commit()}return nil}
config := sarama.NewConfig() config.Version = sarama.V2_1_0_0
本页内容是否解决了您的问题?