tencent cloud

All product documents
TDMQ for RocketMQ
Use of Go SDK
Last updated: 2024-01-17 16:56:46
Use of Go SDK
Last updated: 2024-01-17 16:56:46

Overview

This document describes how to use an open-source SDK to send and receive messages with the Golang SDK serving as example, for you to better understand the complete process of message sending and receiving.

Prerequisites

Directions:

1. Execute the following command in the client environment to download the relevant RocketMQ client dependencies.
go get github.com/apache/rocketmq-client-go/v2
2. Create a producer in the corresponding method. If you need to send standard messages, modify the corresponding parameters in the syncSendMessage.go file.
Currently, delayed messages support arbitrary precision delay, unaffected by the delay level.
General message
Delayed Messages
// Service access address (Note: http:// or https:// must be appended before the access address. Otherwise, it cannot be parsed.)
var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"
// Authorize the role name
var secretKey = "admin"
// Authorize the key for the role
var accessKey = "eyJrZXlJZC...."
// Producer group name
var groupName = "group1"
// Create a message producer
p, _ := rocketmq.NewProducer(
// Set the service address
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
// Set ACL permissions
producer.WithCredentials(primitive.Credentials{
SecretKey: secretKey,
AccessKey: accessKey,
}),
// Set the producer group
producer.WithGroupName(groupName),

// Set the number of retries upon sending failures
producer.WithRetry(2),
)
// Start the producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
// Topic name
var topicName = "topic1"
// Producer group name
var groupName = "group1"
// Create a message producer
p, _ := rocketmq.NewProducer(
// Set the service address
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"})),
// Set ACL permissions
producer.WithCredentials(primitive.Credentials{
SecretKey: "admin",
AccessKey: "eyJrZXlJZC......",
}),
// Set the producer group
producer.WithGroupName(groupName),

// Set the number of retries upon sending failures
producer.WithRetry(2),
)
// Start the producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}

for i := 0; i < 1; i++ {
msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))
// Specify the delay level
// Relationship between level and time:
// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h;
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// If you want to use the delay level, set the following method

msg.WithDelayTimeLevel(3)
// If you want to use arbitrary delay messages, set the following method and leave WithDelayTimeLevel unconfigured. The unit is in specific milliseconds. The following settings indicate that the delivery takes place after 10 seconds.
delayMills := int64(10 * 1000)
msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))
// Send the message

res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
}

// Release resources
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
3. Message sending is the same as above (taking the synchronous sending as an example).
// Topic name
var topicName = "topic1"
// Construct message content
msg := &primitive.Message{
Topic: topicName, // Set the topic name
Body: []byte("Hello RocketMQ Go Client! This is a new message."),
}
// Set the tag
msg.WithTag("TAG")
// Set the key
msg.WithKeys([]string{"yourKey"})
// Send the message
res, err := p.SendSync(context.Background(), msg)

if err != nil {
fmt.Printf("send message error: %s\n", err)
} else {
fmt.Printf("send message success: result=%s\n", res.String())
}
Parameter
Description
topicName
Topic name, which can be copied under the Topic tab on the Cluster page on the console.
TAG
Message tag identifier.
yourKey
Business message key.
Release the resources.
// Shut down the producer
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
Note:
For more information on asynchronous sending and one-way sending, see the demo or RocketMQ-Client-Go Examples.
4. Create a consumer.
// Service access address (Note: http:// or https:// must be appended before the access address. Otherwise, it cannot be parsed.)
var serverAddress = "http://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"
// Authorize the role name
var secretKey = "admin"
// Authorize the key for the role
var accessKey = "eyJrZXlJZC...."
// Producer group name
var groupName = "group11"
// Create a consumer
c, err := rocketmq.NewPushConsumer(
// Set the consumer group
consumer.WithGroupName(groupName),
// Set the service address
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
// Set ACL permissions
consumer.WithCredentials(primitive.Credentials{
SecretKey: secretKey,
AccessKey: accessKey,
}),
// Set consumption from the start offset
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
// Set the consumption mode (cluster mode by default)
consumer.WithConsumerModel(consumer.Clustering),

// For broadcasting consumption, set the instance name to the system name of the application. If it is not set, the PID will be used, which can cause duplicate consumption upon restart.
consumer.WithInstance("xxxx"),
)
if err != nil {
fmt.Println("init consumer2 error: " + err.Error())
os.Exit(0)
}
5. Consume the message.
// Topic name
var topicName = "topic1"
// Set the tag of messages that are subscribed to
selector := consumer.MessageSelector{
Type: consumer.TAG,
Expression: "TagA || TagC",
}
// Define the delay level for retrying consumption. There are 18 delay levels in total. The following is the relationship between delay levels and delay time.
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
delayLevel := 1
err = c.Subscribe(topicName, selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback len: %d \n", len(msgs))
// Set the delay level for the next consumption
concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLater

for _, msg := range msgs {
// Simulate a successful consumption after three retries
if msg.ReconsumeTimes > 3 {
fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)
return consumer.ConsumeSuccess, nil
} else {
fmt.Printf("subscribe callback: %v \n", msg)
}
}
// Simulate a consumption failure and respond with a retry
return consumer.ConsumeRetryLater, nil
})
if err != nil {
fmt.Println(err.Error())
}
Parameter
Description
topicName
The name of the topic, copied from the Topic page on the console.
Expression
Message tag identifier.
delayLevel
Configure the delay level for re-consumption. A total of 18 delay levels are supported.
6. Consume the message (The consumer must consume the message after subscription).
// Initiate consumption
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
// Release resources
err = c.Shutdown()
if err != nil {
fmt.Printf("shundown Consumer error: %s", err.Error())
}
7. Check consumption details. After the message is sent, you will receive a message ID (messageID). Developers can query the recently sent messages on the Message Query page, as shown in the following figure. Information such as details and traces for specific messages is also available. For details, see Message Query section.


Note:
This document briefly describes sending and receiving messages using the Golang client. For more operations, see the demo or the RocketMQ-Client-Go Examples.


Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon