go get github.com/apache/rocketmq-client-go/v2
syncSendMessage.go
file.// Service access address (Note: http:// or https:// must be appended before the access address. Otherwise, it cannot be parsed.)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"// Authorize the role namevar secretKey = "admin"// Authorize the key for the rolevar accessKey = "eyJrZXlJZC...."// Producer group namevar groupName = "group1"// Create a message producerp, _ := rocketmq.NewProducer(// Set the service addressproducer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set ACL permissionsproducer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set the producer groupproducer.WithGroupName(groupName),// Set the number of retries upon sending failuresproducer.WithRetry(2),)// Start the producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}
// Topic namevar topicName = "topic1"// Producer group namevar groupName = "group1"// Create a message producerp, _ := rocketmq.NewProducer(// Set the service addressproducer.WithNsResolver(primitive.NewPassthroughResolver([]string{"http://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"})),// Set ACL permissionsproducer.WithCredentials(primitive.Credentials{SecretKey: "admin",AccessKey: "eyJrZXlJZC......",}),// Set the producer groupproducer.WithGroupName(groupName),// Set the number of retries upon sending failuresproducer.WithRetry(2),)// Start the producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}for i := 0; i < 1; i++ {msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))// Specify the delay level// Relationship between level and time:// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h;// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// If you want to use the delay level, set the following methodmsg.WithDelayTimeLevel(3)// If you want to use arbitrary delay messages, set the following method and leave WithDelayTimeLevel unconfigured. The unit is in specific milliseconds. The following settings indicate that the delivery takes place after 10 seconds.delayMills := int64(10 * 1000)msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))// Send the messageres, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}}// Release resourceserr = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
// Topic namevar topicName = "topic1"// Construct message contentmsg := &primitive.Message{Topic: topicName, // Set the topic nameBody: []byte("Hello RocketMQ Go Client! This is a new message."),}// Set the tagmsg.WithTag("TAG")// Set the keymsg.WithKeys([]string{"yourKey"})// Send the messageres, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}
Parameter | Description |
topicName | Topic name, which can be copied under the Topic tab on the Cluster page on the console. |
TAG | Message tag identifier. |
yourKey | Business message key. |
// Shut down the producererr = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
// Service access address (Note: http:// or https:// must be appended before the access address. Otherwise, it cannot be parsed.)var serverAddress = "http://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080"// Authorize the role namevar secretKey = "admin"// Authorize the key for the rolevar accessKey = "eyJrZXlJZC...."// Producer group namevar groupName = "group11"// Create a consumerc, err := rocketmq.NewPushConsumer(// Set the consumer groupconsumer.WithGroupName(groupName),// Set the service addressconsumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// Set ACL permissionsconsumer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// Set consumption from the start offsetconsumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),// Set the consumption mode (cluster mode by default)consumer.WithConsumerModel(consumer.Clustering),// For broadcasting consumption, set the instance name to the system name of the application. If it is not set, the PID will be used, which can cause duplicate consumption upon restart.consumer.WithInstance("xxxx"),)if err != nil {fmt.Println("init consumer2 error: " + err.Error())os.Exit(0)}
// Topic namevar topicName = "topic1"// Set the tag of messages that are subscribed toselector := consumer.MessageSelector{Type: consumer.TAG,Expression: "TagA || TagC",}// Define the delay level for retrying consumption. There are 18 delay levels in total. The following is the relationship between delay levels and delay time.// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2hdelayLevel := 1err = c.Subscribe(topicName, selector, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {fmt.Printf("subscribe callback len: %d \\n", len(msgs))// Set the delay level for the next consumptionconcurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLaterfor _, msg := range msgs {// Simulate a successful consumption after three retriesif msg.ReconsumeTimes > 3 {fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)return consumer.ConsumeSuccess, nil} else {fmt.Printf("subscribe callback: %v \\n", msg)}}// Simulate a consumption failure and respond with a retryreturn consumer.ConsumeRetryLater, nil})if err != nil {fmt.Println(err.Error())}
Parameter | Description |
topicName | The name of the topic, copied from the Topic page on the console. |
Expression | Message tag identifier. |
delayLevel | Configure the delay level for re-consumption. A total of 18 delay levels are supported. |
// Initiate consumptionerr = c.Start()if err != nil {fmt.Println(err.Error())os.Exit(-1)}time.Sleep(time.Hour)// Release resourceserr = c.Shutdown()if err != nil {fmt.Printf("shundown Consumer error: %s", err.Error())}
Was this page helpful?