tencent cloud

피드백

VPC Access

마지막 업데이트 시간:2024-12-18 15:07:24

    Overview

    This document describes how to access CKafka to send/receive messages with the SDK for Go in a VPC.

    Prerequisites

    Directions

    Step 1. Prepare configurations

    1. Upload the gokafkademo in the downloaded demo to the Linux server.
    2. Log in to the Linux server, enter the gokafkademo directory, and run the following command to add the dependency library.
    go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
    3. Modify the configuration file kafka.json.
    {
    "topic": [
    "test"
    ],
    "bootstrapServers": [
    "xx.xx.xx.xx:xxxx"
    ],
    "consumerGroupId": "yourConsumerId"
    }
    
    Parameter
    Description
    topic
    Topic name, which can be copied in Topic Management on the instance details page in the console.
    
    img
    
    
    bootstrapServers
    Accessed network, which can be copied from the Network column in the Access Mode section in Basic Info on the instance details page in the console.
    
    img
    
    
    consumerGroupId
    You can customize it. After the demo runs successfully, you can see the consumer in Consumer Group on the instance details page.

    Step 2. Send messages

    1. Write a message production program.
    package main
    import (
    "fmt"
    "gokafkademo/config"
    "log"
    "strings"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    func main() {
    cfg, err := config.ParseConfig("../config/kafka.json")
    if err != nil {
    log.Fatal(err)
    }
    p, err := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": strings.Join(cfg.Servers, ","),
    "acks": 1,
    "retries": 0,
    "retry.backoff.ms": 100,
    "socket.timeout.ms": 6000,
    "reconnect.backoff.max.ms": 3000,
    })
    if err != nil {
    log.Fatal(err)
    }
    defer p.Close()
    go func() {
    for e := range p.Events() {
    switch ev := e.(type) {
    case *kafka.Message:
    if ev.TopicPartition.Error != nil {
    fmt.Printf("Delivery failed: %v\\n", ev.TopicPartition)
    } else {
    fmt.Printf("Delivered message to %v\\n",ev.TopicPartition)
    }
    }
    }
    }()
    topic := cfg.Topic[0]
    for _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {
    _ = p.Produce(&kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
    Value: []byte(word),
    }, nil)
    }
    p.Flush(10 * 1000)
    }
    2. Compile and run the program to send messages.
    go run main.go
    3. View the execution result. Below is a sample:
    Delivered message to test[0]@628
    Delivered message to test[0]@629
    4. On the Topic Management tab page on the instance details page in the CKafka console, select the topic, and click More > Message Query to view the messages just sent.
    
    
    

    Step 3. Consume messages

    1. Write the message consumption program.
    package main
    import (
    "fmt"
    "gokafkademo/config"
    "log"
    "strings"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    )
    func main() {
    cfg, err := config.ParseConfig("../config/kafka.json")
    if err != nil {
    log.Fatal(err)
    }
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": strings.Join(cfg.Servers, ","),
    "group.id": cfg.ConsumerGroupId,
    "auto.offset.reset": "earliest",
    "session.timeout.ms": 10000,
    })
    if err != nil {
    log.Fatal(err)
    }
    
    err = c.SubscribeTopics(cfg.Topic, nil)
    if err != nil {
    log.Fatal(err)
    }
    for {
    msg, err := c.ReadMessage(-1)
    if err == nil {
    fmt.Printf("Message on %s: %s\\n", msg.TopicPartition, string(msg.Value))
    } else {
    fmt.Printf("Consumer error: %v (%v)\\n", err, msg)
    }
    }
    c.Close()
    }
    
    2. Compile and run the program to consume messages.
    go run main.go
    3. View the execution result. Below is a sample:
    Message on test[0]@628: Confluent-Kafka
    Message on test[0]@629: Golang Client Message
    4. On the Consumer Group tab page in the CKafka console, select the consumer group name, enter the topic name, and click View Details to view the consumption details.
    
    
    
    문의하기

    고객의 업무에 전용 서비스를 제공해드립니다.

    기술 지원

    더 많은 도움이 필요하시면, 티켓을 통해 연락 바랍니다. 티켓 서비스는 연중무휴 24시간 제공됩니다.

    연중무휴 24시간 전화 지원