tencent cloud

All product documents
TDMQ for CKafka
VPC Access
Last updated: 2024-12-18 15:07:24
VPC Access
Last updated: 2024-12-18 15:07:24

Overview

This document describes how to access CKafka to send/receive messages with the SDK for Go in a VPC.

Prerequisites

Directions

Step 1. Prepare configurations

1. Upload the gokafkademo in the downloaded demo to the Linux server.
2. Log in to the Linux server, enter the gokafkademo directory, and run the following command to add the dependency library.
go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
3. Modify the configuration file kafka.json.
{
"topic": [
"test"
],
"bootstrapServers": [
"xx.xx.xx.xx:xxxx"
],
"consumerGroupId": "yourConsumerId"
}

Parameter
Description
topic
Topic name, which can be copied in Topic Management on the instance details page in the console.

img


bootstrapServers
Accessed network, which can be copied from the Network column in the Access Mode section in Basic Info on the instance details page in the console.

img


consumerGroupId
You can customize it. After the demo runs successfully, you can see the consumer in Consumer Group on the instance details page.

Step 2. Send messages

1. Write a message production program.
package main
import (
"fmt"
"gokafkademo/config"
"log"
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
cfg, err := config.ParseConfig("../config/kafka.json")
if err != nil {
log.Fatal(err)
}
p, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": strings.Join(cfg.Servers, ","),
"acks": 1,
"retries": 0,
"retry.backoff.ms": 100,
"socket.timeout.ms": 6000,
"reconnect.backoff.max.ms": 3000,
})
if err != nil {
log.Fatal(err)
}
defer p.Close()
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n",ev.TopicPartition)
}
}
}
}()
topic := cfg.Topic[0]
for _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {
_ = p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(word),
}, nil)
}
p.Flush(10 * 1000)
}
2. Compile and run the program to send messages.
go run main.go
3. View the execution result. Below is a sample:
Delivered message to test[0]@628
Delivered message to test[0]@629
4. On the Topic Management tab page on the instance details page in the CKafka console, select the topic, and click More > Message Query to view the messages just sent.




Step 3. Consume messages

1. Write the message consumption program.
package main
import (
"fmt"
"gokafkademo/config"
"log"
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
cfg, err := config.ParseConfig("../config/kafka.json")
if err != nil {
log.Fatal(err)
}
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": strings.Join(cfg.Servers, ","),
"group.id": cfg.ConsumerGroupId,
"auto.offset.reset": "earliest",
"session.timeout.ms": 10000,
})
if err != nil {
log.Fatal(err)
}

err = c.SubscribeTopics(cfg.Topic, nil)
if err != nil {
log.Fatal(err)
}
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
} else {
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
c.Close()
}

2. Compile and run the program to consume messages.
go run main.go
3. View the execution result. Below is a sample:
Message on test[0]@628: Confluent-Kafka
Message on test[0]@629: Golang Client Message
4. On the Consumer Group tab page in the CKafka console, select the consumer group name, enter the topic name, and click View Details to view the consumption details.



Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon