go get github.com/apache/rocketmq-client-go/v2
syncSendMessage.go
file.// Service access address (Note: Add “http://” or “https://” before the access address; otherwise, it cannot be resolved)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// Authorize the role namevar secretKey = "admin"// Authorize the role tokenvar accessKey = "eyJrZXlJZC...."// Full namespace namevar nameSpace = "MQ_INST_rocketmqem4xxxx"// Producer group namevar groupName = "group1"// Create a message producerp, _ := rocketmq.NewProducer(// Set the service addressproducer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set ACL permissionsproducer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the producer groupproducer.WithGroupName(groupName),// Set the namespace nameproducer.WithNamespace(nameSpace),// Set the number of retries upon sending failuresproducer.WithRetry(2),)// Start the producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}
// Topic namevar topicName = "topic1"// Producer group namevar groupName = "group1"// Create a message producerp, _ := rocketmq.NewProducer(// Set the service addressproducer.WithNsResolver(primitive.NewPassthroughResolver([]string{"https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"})),// Set ACL permissionsproducer.WithCredentials(primitive.Credentials{SecretKey: "admin",AccessKey: "eyJrZXlJZC......",}),// Set the producer groupproducer.WithGroupName(groupName),// Set the namespace nameproducer.WithNamespace("rocketmq-xxx|namespace_go"),// Set the number of retries upon sending failuresproducer.WithRetry(2),)// Start the producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}for i := 0; i < 1; i++ {msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))// Set delay level// The relationship between the delay level and the delay time:// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h;// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// If you want to use the delay level, then set the following method:msg.WithDelayTimeLevel(3)// If you want to use any delayed message, then set the following method without setting `WithDelayTimeLevel`. The unit is milliseconds. The following shows that a delayed message is delivered after 10 seconds.delayMills := int64(10 * 1000)msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))// Send the messageres, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}}// Release resourceserr = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
Parameter | Description |
secretKey | |
accessKey | |
nameSpace | Namespace name, which can be copied on the Namespace page in the console. |
serverAddress | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved. |
groupName | Producer group name, which can be copied under the Group tab in the console. |
// Topic namevar topicName = "topic1"// Configure message contentmsg := &primitive.Message{Topic: topicName, // Set the topic nameBody: []byte("Hello RocketMQ Go Client! This is a new message."),}// Set tagsmsg.WithTag("TAG")// Set keysmsg.WithKeys([]string{"yourKey"})// Send the messageres, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}
Parameter | Description |
topicName | Topic name, which can be copied under the Topic tab on the cluster details page in the console. |
TAG | Message tag identifier |
yourKey | Message business key |
// Disable the producererr = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
// Service access address (Note: Add “http://” or “https://” before the access address; otherwise, it cannot be resolved)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// Authorize the role namevar secretKey = "admin"// Authorize the role tokenvar accessKey = "eyJrZXlJZC...."// Full namespace namevar nameSpace = "rocketmq-xxx|namespace_go"// Producer group namevar groupName = "group11"// Create a consumerc, err := rocketmq.NewPushConsumer(// Set the consumer groupconsumer.WithGroupName(groupName),// Set the service addressconsumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set ACL permissionsconsumer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the namespace nameconsumer.WithNamespace(nameSpace),// Set consumption from the start offsetconsumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),// Set the consumption mode (cluster consumption by default)consumer.WithConsumerModel(consumer.Clustering),//For broadcasting consumption, set the instance name to the system name of the application. If the instance name is not set, the pid will be used, which will cause a restart for repeated consumptionconsumer.WithInstance("xxxx"),)if err != nil {fmt.Println("init consumer2 error: " + err.Error())os.Exit(0)}
Parameter | Description |
secretKey | |
accessKey | |
nameSpace | The full namespace name can be copied under the Topic tab on the Cluster page in the console, which is in the format of cluster ID + | + namespace. |
serverAddress | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. Note: Add http:// or https:// before the access address; otherwise, it cannot be resolved. |
groupName | Producer group name, which can be copied under the Group tab in the console. |
// Topic namevar topicName = "topic1"// Set the tag of messages that are subscribed toselector := consumer.MessageSelector{Type: consumer.TAG,Expression: "TagA || TagC",}// Set the delay level of consumption retry. A total of 18 levels can be set. Below is the relationship between each delay level and the delay time.// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2hdelayLevel := 1err = c.Subscribe(topicName, selector, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {fmt.Printf("subscribe callback len: %d \\n", len(msgs))// Set the delay level for the next consumptionconcurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLaterfor _, msg := range msgs {// Simulate a successful consumption after three retriesif msg.ReconsumeTimes > 3 {fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)return consumer.ConsumeSuccess, nil} else {fmt.Printf("subscribe callback: %v \\n", msg)}}// Simulate a consumption failure. Retry is required.return consumer.ConsumeRetryLater, nil})if err != nil {fmt.Println(err.Error())}
Parameter | Description |
topicName | Topic name, which can be copied on the Topic page in the console. |
Expression | Message tag identifier |
delayLevel | A parameter used to set the delay level of consumption retry. A total of 18 delay levels are supported. |
// Start consumptionerr = c.Start()if err != nil {fmt.Println(err.Error())os.Exit(-1)}time.Sleep(time.Hour)// Release resourceserr = c.Shutdown()if err != nil {fmt.Printf("shundown Consumer error: %s", err.Error())}
Was this page helpful?