tencent cloud

All product documents
TDMQ for RocketMQ
SDK for Go
Last updated: 2023-09-12 17:53:17
SDK for Go
Last updated: 2023-09-12 17:53:17

Overview

This document describes how to use open-source SDK to send and receive messages by using the SDK for Go as an example and helps you better understand the message sending and receiving processes.

Prerequisites

You have created the required resources as instructed in Resource Creation and Preparation.
You have installed Go.

Directions

1. Run the following command in the client environment to RocketMQ client dependencies.
go get github.com/apache/rocketmq-client-go/v2
2. Create a producer in the corresponding method. If you need to send general messages, modify the corresponding parameters in the syncSendMessage.go file.
Delayed messages currently support delays of arbitrary precision without being subject to the delay level.
General Message
Delayed message
// Service access address (Note: Add “http://” or “https://” before the access address; otherwise, it cannot be resolved)
var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
// Authorize the role name
var secretKey = "admin"
// Authorize the role token
var accessKey = "eyJrZXlJZC...."
// Full namespace name
var nameSpace = "MQ_INST_rocketmqem4xxxx"
// Producer group name
var groupName = "group1"
// Create a message producer
p, _ := rocketmq.NewProducer(
// Set the service address
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
// Set ACL permissions
producer.WithCredentials(primitive.Credentials{
SecretKey: secretKey,
AccessKey: accessKey,
}),
// Set the producer group
producer.WithGroupName(groupName),
// Set the namespace name
producer.WithNamespace(nameSpace),
// Set the number of retries upon sending failures
producer.WithRetry(2),
)
// Start the producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}

// Topic name
var topicName = "topic1"
// Producer group name
var groupName = "group1"
// Create a message producer
p, _ := rocketmq.NewProducer(
// Set the service address
producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"})),
// Set ACL permissions
producer.WithCredentials(primitive.Credentials{
SecretKey: "admin",
AccessKey: "eyJrZXlJZC......",
}),
// Set the producer group
producer.WithGroupName(groupName),
// Set the namespace name
producer.WithNamespace("rocketmq-xxx|namespace_go"),
// Set the number of retries upon sending failures
producer.WithRetry(2),
)
// Start the producer
err := p.Start()
if err != nil {
fmt.Printf("start producer error: %s", err.Error())
os.Exit(1)
}
for i := 0; i < 1; i++ {
msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))
// Set delay level
// The relationship between the delay level and the delay time:
// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h;
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// If you want to use the delay level, then set the following method:

msg.WithDelayTimeLevel(3)
// If you want to use any delayed message, then set the following method without setting `WithDelayTimeLevel`. The unit is milliseconds. The following shows that a delayed message is delivered after 10 seconds.
delayMills := int64(10 * 1000)
msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))
// Send the message
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\\n", err)
} else {
fmt.Printf("send message success: result=%s\\n", res.String())
}
}

// Release resources
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
Parameter
Description
secretKey
Role name, which can be copied on the Role Management page.
accessKey
Role token, which can be copied in the Token column on the Role Management page.

nameSpace
Namespace name, which can be copied on the Namespace page in the console.
serverAddress
Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved.

groupName
Producer group name, which can be copied under the Group tab in the console.
3. The process of sending messages (using sync sending as an example) is the same as above.
// Topic name
var topicName = "topic1"
// Configure message content
msg := &primitive.Message{
Topic: topicName, // Set the topic name
Body: []byte("Hello RocketMQ Go Client! This is a new message."),
}
// Set tags
msg.WithTag("TAG")
// Set keys
msg.WithKeys([]string{"yourKey"})
// Send the message
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("send message error: %s\\n", err)
} else {
fmt.Printf("send message success: result=%s\\n", res.String())
}
Parameter
Description
topicName
Topic name, which can be copied under the Topic tab on the cluster details page in the console.
TAG
Message tag identifier
yourKey
Message business key
Release the resource.
// Disable the producer
err = p.Shutdown()
if err != nil {
fmt.Printf("shutdown producer error: %s", err.Error())
}
Note
For more information on async sending and one-way sending, see Demo or RocketMQ-Client-Go Example.
4. Create a consumer.
// Service access address (Note: Add “http://” or “https://” before the access address; otherwise, it cannot be resolved)
var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"
// Authorize the role name
var secretKey = "admin"
// Authorize the role token
var accessKey = "eyJrZXlJZC...."
// Full namespace name
var nameSpace = "rocketmq-xxx|namespace_go"
// Producer group name
var groupName = "group11"
// Create a consumer
c, err := rocketmq.NewPushConsumer(
// Set the consumer group
consumer.WithGroupName(groupName),
// Set the service address
consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),
// Set ACL permissions
consumer.WithCredentials(primitive.Credentials{
SecretKey: secretKey,
AccessKey: accessKey,
}),
// Set the namespace name
consumer.WithNamespace(nameSpace),
// Set consumption from the start offset
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
// Set the consumption mode (cluster consumption by default)
consumer.WithConsumerModel(consumer.Clustering),
//For broadcasting consumption, set the instance name to the system name of the application. If the instance name is not set, the pid will be used, which will cause a restart for repeated consumption
consumer.WithInstance("xxxx"),
)
if err != nil {
fmt.Println("init consumer2 error: " + err.Error())
os.Exit(0)
}
Parameter
Description
secretKey
Role name, which can be copied on the Role Management page.
accessKey
Role token, which can be copied in the Token column on the Role Management page.

nameSpace
The full namespace name can be copied under the Topic tab on the Cluster page in the console, which is in the format of cluster ID + | + namespace.
serverAddress
Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved.

groupName
Producer group name, which can be copied under the Group tab in the console.
5. Consume a message.
// Topic name
var topicName = "topic1"
// Set the tag of messages that are subscribed to
selector := consumer.MessageSelector{
Type: consumer.TAG,
Expression: "TagA || TagC",
}
// Set the delay level of consumption retry. A total of 18 levels can be set. Below is the relationship between each delay level and the delay time.
// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h
delayLevel := 1
err = c.Subscribe(topicName, selector, func(ctx context.Context,
msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
fmt.Printf("subscribe callback len: %d \\n", len(msgs))
// Set the delay level for the next consumption
concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLater
for _, msg := range msgs {
// Simulate a successful consumption after three retries
if msg.ReconsumeTimes > 3 {
fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)
return consumer.ConsumeSuccess, nil
} else {
fmt.Printf("subscribe callback: %v \\n", msg)
}
}
// Simulate a consumption failure. Retry is required.
return consumer.ConsumeRetryLater, nil
})
if err != nil {
fmt.Println(err.Error())
}
Parameter
Description
topicName
Topic name, which can be copied on the Topic page in the console.
Expression
Message tag identifier
delayLevel
A parameter used to set the delay level of consumption retry. A total of 18 delay levels are supported.
6. Consume messages (the consumer can consume messages only after the messages are subscribed to).
// Start consumption
err = c.Start()
if err != nil {
fmt.Println(err.Error())
os.Exit(-1)
}
time.Sleep(time.Hour)
// Release resources
err = c.Shutdown()
if err != nil {
fmt.Printf("shundown Consumer error: %s", err.Error())
}
7. View consumption details. Log in to the TDMQ console, go to the Cluster > Group page, and view the list of clients connected to the group. Click View Details in the Operation column to view consumer details.


Note
Above is a brief introduction to how to send and receive messages with the Go client. For more information, see Demo or Rocketmq-Client-Go Example.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon