go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
{"topic": ["xxxx"],"sasl": {"username": "yourUserName","password": "yourPassword","instanceId":"instanceId"},"bootstrapServers": ["xxx.ap-changsha-ec.ckafka.tencentcloudmq.com:6000"],"consumerGroupId": "yourConsumerId"}
参数 | 描述 |
topic | Topic 名称,您可以在控制台上 topic管理页面复制。 |
sasl.username | 用户名,在控制台用户管理页面创建用户时设置。 |
sasl.password | 用户密码,在控制台用户管理页面创建用户时设置。 |
sasl.instanceId | 实例 ID,在控制台的实例详情页面的基本信息获取。 |
bootstrapServers | 接入网络,在控制台的实例详情页面接入方式模块的网络列复制。 |
consumerGroupId | 您可以自定义设置,Demo 运行成功后可以在 Consumer Group 页面看到该消费者。 |
package mainimport ("fmt""gokafkademo/config""log""strings""github.com/confluentinc/confluent-kafka-go/kafka")func main() {cfg, err := config.ParseConfig("../config/kafka.json")if err != nil {log.Fatal(err)}p, err := kafka.NewProducer(&kafka.ConfigMap{// 设置接入点,请通过控制台获取对应Topic的接入点。"bootstrap.servers": strings.Join(cfg.Servers, ","),// SASL 验证机制类型默认选用 PLAIN"sasl.mechanism": "PLAIN",// 在本地配置 ACL 策略。"security.protocol": "SASL_PLAINTEXT",// username 是实例 ID + # + 配置的用户名,password 是配置的用户密码。"sasl.username": fmt.Sprintf("%s#%s", cfg.SASL.InstanceId, cfg.SASL.Username),"sasl.password": cfg.SASL.Password,// Kafka producer 的 ack 有 3 种机制,分别说明如下:// -1 或 all:Broker 在 leader 收到数据并同步给所有 ISR 中的 follower 后,才应答给 Producer 继续发送下一条(批)消息。// 这种配置提供了最高的数据可靠性,只要有一个已同步的副本存活就不会有消息丢失。注意:这种配置不能确保所有的副本读写入该数据才返回,// 可以配合 Topic 级别参数 min.insync.replicas 使用。// 0:生产者不等待来自 broker 同步完成的确认,继续发送下一条(批)消息。这种配置生产性能最高,但数据可靠性最低(当服务器故障时可能会有数据丢失,如果 leader 已死但是 producer 不知情,则 broker 收不到消息)// 1: 生产者在 leader 已成功收到的数据并得到确认后再发送下一条(批)消息。这种配置是在生产吞吐和数据可靠性之间的权衡(如果leader已死但是尚未复制,则消息可能丢失)// 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置"acks": 1,// 请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失"retries": 0,// 发送请求失败时到下一次重试请求之间的时间"retry.backoff.ms": 100,// producer 网络请求的超时时间。"socket.timeout.ms": 6000,// 设置客户端内部重试间隔。"reconnect.backoff.max.ms": 3000,})if err != nil {log.Fatal(err)}defer p.Close()// 产生的消息 传递至报告处理程序go func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v\\n", ev.TopicPartition)}}}}()// 异步发送消息topic := cfg.Topicfor _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {_ = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: []byte(word),}, nil)}// 等待消息传递p.Flush(10 * 1000)
go run main.go
Delivered message to test[0]@628Delivered message to test[0]@629
package mainimport ("fmt""gokafkademo/config""log""strings""github.com/confluentinc/confluent-kafka-go/kafka")func main() {cfg, err := config.ParseConfig("../config/kafka.json")if err != nil {log.Fatal(err)}c, err := kafka.NewConsumer(&kafka.ConfigMap{// 设置接入点,请通过控制台获取对应Topic的接入点。"bootstrap.servers": strings.Join(cfg.Servers, ","),// SASL 验证机制类型默认选用 PLAIN"sasl.mechanism": "PLAIN",// 在本地配置 ACL 策略。"security.protocol": "SASL_PLAINTEXT",// username 是实例 ID + # + 配置的用户名,password 是配置的用户密码。"sasl.username": fmt.Sprintf("%s#%s", cfg.SASL.InstanceId, cfg.SASL.Username),"sasl.password": cfg.SASL.Password,// 设置的消息消费组"group.id": cfg.ConsumerGroupId,"auto.offset.reset": "earliest",// 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker// 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间"session.timeout.ms": 10000,})if err != nil {log.Fatal(err)}// 订阅的消息topic 列表err = c.SubscribeTopics([]string{"test", "test-topic"}, nil)if err != nil {log.Fatal(err)}for {msg, err := c.ReadMessage(-1)if err == nil {fmt.Printf("Message on %s: %s\\n", msg.TopicPartition, string(msg.Value))} else {// 客户端将自动尝试恢复所有的 errorfmt.Printf("Consumer error: %v (%v)\\n", err, msg)}}c.Close()}
go run main.go
Message on test[0]@628: Confluent-KafkaMessage on test[0]@629: Golang Client Message
本页内容是否解决了您的问题?