go get github.com/apache/rocketmq-client-go/v2
syncSendMessage.go
文件内修改对应的参数。// 服务接入地址 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// 授权角色名var secretKey = "admin"// 授权角色密钥var accessKey = "eyJrZXlJZC...."// 命名空间全称var nameSpace = "MQ_INST_rocketmqem4xxxx"// 生产者组名称var groupName = "group1"// 创建消息生产者p, _ := rocketmq.NewProducer(// 设置服务地址producer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// 设置acl权限producer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// 设置生产组producer.WithGroupName(groupName),// 设置命名空间名称producer.WithNamespace(nameSpace),// 设置发送失败重试次数producer.WithRetry(2),)// 启动producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}
// topic名称var topicName = "topic1"// 生产者组名称var groupName = "group1"// 创建消息生产者p, _ := rocketmq.NewProducer(// 设置服务地址producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"})),// 设置acl权限producer.WithCredentials(primitive.Credentials{SecretKey: "admin",AccessKey: "eyJrZXlJZC......",}),// 设置生产组producer.WithGroupName(groupName),// 设置命名空间名称producer.WithNamespace("rocketmq-xxx|namespace_go"),// 设置发送失败重试次数producer.WithRetry(2),)// 启动producererr := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())os.Exit(1)}for i := 0; i < 1; i++ {msg := primitive.NewMessage(topicName, []byte("Hello RocketMQ Go Client! This is a delay message."))// 设置延迟等级// 等级与时间对应关系:// 1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h;// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18//如果想用延迟级别,那么设置下面这个方法msg.WithDelayTimeLevel(3)//如果想用任意延迟消息,那么设置下面这个方法,WithDelayTimeLevel 就不要设置了,单位为具体的毫秒,如下则是10s后投递delayMills := int64(10 * 1000)msg.WithProperty("__STARTDELIVERTIME", strconv.FormatInt(time.Now().Unix()+delayMills, 10))// 发送消息res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}}// 释放资源err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
// topic名称var topicName = "topic1"// 构造消息内容msg := &primitive.Message{Topic: topicName, // 设置topic名称Body: []byte("Hello RocketMQ Go Client! This is a new message."),}// 设置tagmsg.WithTag("TAG")// 设置keymsg.WithKeys([]string{"yourKey"})// 发送消息res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\\n", err)} else {fmt.Printf("send message success: result=%s\\n", res.String())}
参数 | 说明 |
topicName | Topic 名称在控制台集群管理中 Topic 页签中复制具体 Topic 名称。 |
TAG | 消息 TAG 标识。 |
yourKey | 消息业务 key。 |
// 关闭生产者err = p.Shutdown()if err != nil {fmt.Printf("shutdown producer error: %s", err.Error())}
// 服务接入地址 (注意:需要在接入地址前面加上 http:// 或 https:// 否则无法解析)var serverAddress = "https://rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876"// 授权角色名var secretKey = "admin"// 授权角色密钥var accessKey = "eyJrZXlJZC...."// 命名空间全称var nameSpace = "rocketmq-xxx|namespace_go"// 生产者组名称var groupName = "group11"// 创建consumerc, err := rocketmq.NewPushConsumer(// 设置消费者组consumer.WithGroupName(groupName),// 设置服务地址consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{serverAddress})),// 设置acl权限consumer.WithCredentials(primitive.Credentials{SecretKey: secretKey,AccessKey: accessKey,}),// 设置命名空间名称consumer.WithNamespace(nameSpace),// 设置从起始位置开始消费consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),// 设置消费模式(默认集群模式)consumer.WithConsumerModel(consumer.Clustering),//广播消费,设置一下实例名,设置为应用的系统名即可。如果不设置,会使用pid,这会导致重启消费重复consumer.WithInstance("xxxx"),)if err != nil {fmt.Println("init consumer2 error: " + err.Error())os.Exit(0)}
// topic名称var topicName = "topic1"// 设置订阅消息的tagselector := consumer.MessageSelector{Type: consumer.TAG,Expression: "TagA || TagC",}// 设置重新消费的延迟级别,共支持18种延迟级别。下面是延迟级别与延迟时间的对应关系// 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18// 1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2hdelayLevel := 1err = c.Subscribe(topicName, selector, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {fmt.Printf("subscribe callback len: %d \\n", len(msgs))// 设置下次消费的延迟级别concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)concurrentCtx.DelayLevelWhenNextConsume = delayLevel // only run when return consumer.ConsumeRetryLaterfor _, msg := range msgs {// 模拟重试3次后消费成功if msg.ReconsumeTimes > 3 {fmt.Printf("msg ReconsumeTimes > 3. msg: %v", msg)return consumer.ConsumeSuccess, nil} else {fmt.Printf("subscribe callback: %v \\n", msg)}}// 模拟消费失败,回复重试return consumer.ConsumeRetryLater, nil})if err != nil {fmt.Println(err.Error())}
参数 | 说明 |
topicName | Topic 的名称,在控制台 Topic 页面复制。 |
Expression | 消息 TAG 标识。 |
delayLevel | 设置重新消费的延迟级别,共支持18种延迟级别。 |
// 开始消费err = c.Start()if err != nil {fmt.Println(err.Error())os.Exit(-1)}time.Sleep(time.Hour)// 资源释放err = c.Shutdown()if err != nil {fmt.Printf("shundown Consumer error: %s", err.Error())}
本页内容是否解决了您的问题?