This document describes how to access CKafka to send/receive messages with the SDK for Go in a VPC.
gokafkademo
in the downloaded demo to the Linux server.gokafkademo
directory, and run the following command to add the dependency library.go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
kafka.json
.{
"topic": [
"test"
],
"bootstrapServers": [
"xx.xx.xx.xx:xxxx"
],
"consumerGroupId": "yourConsumerId"
}
Parameter | Description |
---|---|
topic | Topic name, which can be copied in Topic Management on the instance details page in the console. |
bootstrapServers | Accessed network, which can be copied from the Network column in the Access Mode section in Basic Info on the instance details page in the console. |
consumerGroupId | You can customize it. After the demo runs successfully, you can see the consumer in Consumer Group on the instance details page. |
package main
import (
"fmt"
"gokafkademo/config"
"log"
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
cfg, err := config.ParseConfig("../config/kafka.json")
if err != nil {
log.Fatal(err)
}
p, err := kafka.NewProducer(&kafka.ConfigMap{
// Set the access point of the topic, which can be obtained in the console
"bootstrap.servers": strings.Join(cfg.Servers, ","),
// If you do not configure this parameter, the default value will be 1. You can customize this according to your business requirements.
"acks": 1,
// Number of retries upon request error. It is recommended that you set the parameter to a value greater than 0 to enable retries and guarantee that messages are not lost to the greatest extent possible.
"retries": 0,
// Retry interval upon request failure
"retry.backoff.ms": 100,
// Timeout duration of a producer network request
"socket.timeout.ms": 6000,
// Set the interval between retries for the client
"reconnect.backoff.max.ms": 3000,
})
if err != nil {
log.Fatal(err)
}
defer p.Close()
// Deliver the produced messages to the report processor
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n",ev.TopicPartition)
}
}
}
}()
// Send messages in async mode
topic := cfg.Topic[0]
for _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {
_ = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
// Wait for message delivery
p.Flush(10 * 1000)
}
go run main.go
Delivered message to test[0]@628
Delivered message to test[0]@629
Write the message consumption program.
package main
import (
"fmt"
"gokafkademo/config"
"log"
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
cfg, err := config.ParseConfig("../config/kafka.json")
if err != nil {
log.Fatal(err)
}
c, err := kafka.NewConsumer(&kafka.ConfigMap{
// Set the access point of the topic, which can be obtained in the console
"bootstrap.servers": strings.Join(cfg.Servers, ","),
// The set message consumer group
"group.id": cfg.ConsumerGroupId,
"auto.offset.reset": "earliest",
// Consumer timeout period when the Kafka consumer grouping mechanism is used. If the broker does not receive the heartbeat of the consumer within this period, the consumer will be considered to have failed and the broker will initiate rebalance.
// Currently, this value must be configured in the broker between 6000 (value of group.min.session.timeout.ms) and 300000 (value of group.max.session.timeout.ms).
"session.timeout.ms": 10000,
})
if err != nil {
log.Fatal(err)
}
// List of subscribed message topics
err = c.SubscribeTopics(cfg.Topic, nil)
if err != nil {
log.Fatal(err)
}
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
// The client will automatically try to recover all errors
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}
Compile and run the program to consume messages.
go run main.go
Message on test[0]@628: Confluent-Kafka
Message on test[0]@629: Golang Client Message
Was this page helpful?