gokafkademo
in the downloaded demo to the Linux server.gokafkademo
directory, and run the following command to add the dependency library.go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
kafka.json
.{"topic": ["test"],"bootstrapServers": ["xx.xx.xx.xx:xxxx"],"consumerGroupId": "yourConsumerId"}
Parameter | Description |
topic | Topic name, which can be copied in Topic Management on the instance details page in the console. |
bootstrapServers | Accessed network, which can be copied from the Network column in the Access Mode section in Basic Info on the instance details page in the console. |
consumerGroupId | You can customize it. After the demo runs successfully, you can see the consumer in Consumer Group on the instance details page. |
package mainimport ("fmt""gokafkademo/config""log""strings""github.com/confluentinc/confluent-kafka-go/kafka")func main() {cfg, err := config.ParseConfig("../config/kafka.json")if err != nil {log.Fatal(err)}p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": strings.Join(cfg.Servers, ","),"acks": 1,"retries": 0,"retry.backoff.ms": 100,"socket.timeout.ms": 6000,"reconnect.backoff.max.ms": 3000,})if err != nil {log.Fatal(err)}defer p.Close()go func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v\\n",ev.TopicPartition)}}}}()topic := cfg.Topic[0]for _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {_ = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: []byte(word),}, nil)}p.Flush(10 * 1000)}
go run main.go
Delivered message to test[0]@628Delivered message to test[0]@629
package mainimport ("fmt""gokafkademo/config""log""strings""github.com/confluentinc/confluent-kafka-go/kafka")func main() {cfg, err := config.ParseConfig("../config/kafka.json")if err != nil {log.Fatal(err)}c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": strings.Join(cfg.Servers, ","),"group.id": cfg.ConsumerGroupId,"auto.offset.reset": "earliest","session.timeout.ms": 10000,})if err != nil {log.Fatal(err)}err = c.SubscribeTopics(cfg.Topic, nil)if err != nil {log.Fatal(err)}for {msg, err := c.ReadMessage(-1)if err == nil {fmt.Printf("Message on %s: %s\\n", msg.TopicPartition, string(msg.Value))} else {fmt.Printf("Consumer error: %v (%v)\\n", err, msg)}}c.Close()}
go run main.go
Message on test[0]@628: Confluent-KafkaMessage on test[0]@629: Golang Client Message
문제 해결에 도움이 되었나요?