tencent cloud

文档反馈

发送与接收普通消息

最后更新时间:2023-10-19 11:11:07

    操作场景

    本文以调用 Spring Boot Starter SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

    前提条件

    下载 Demo或者前往GitHub 项目

    操作步骤

    步骤1:添加依赖

    在 pom.xml 中添加依赖。
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
    </dependency>

    步骤2:准备配置

    在配置文件中添加配置信息。
    server:
    port: 8082
    
    #rocketmq配置信息
    rocketmq:
    # tdmq-rocketmq服务接入地址
    name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
    # 生产者配置
    producer:
    # 生产者组名
    group: group111
    # 角色密钥
    access-key: eyJrZXlJZC....
    # 已授权的角色名称
    secret-key: admin
    # 消费者公共配置
    consumer:
    # 角色密钥
    access-key: eyJrZXlJZC....
    # 已授权的角色名称
    secret-key: admin
    
    # 自定义配置,根据业务进行配置
    namespace: rocketmq-xxx|namespace1
    producer1:
    topic: testdev1
    consumer1:
    group: group111
    topic: testdev1
    subExpression: TAG1
    consumer2:
    group: group222
    topic: testdev1
    subExpression: TAG2
    参数
    说明
    name-server
    集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。
    group
    生产者 Group 的名称,在控制台 Group 页面复制。
    secret-key
    角色名称,在 角色管理 页面复制。
    access-key
    角色密钥,在 角色管理 页面复制密钥列复制。
    
    
    
    namespace
    命名空间的名称,在控制台命名空间页面复制。
    topic
    Topic 的名称,在控制台 topic 页面复制。
    subExpression
    用来设置消息的 TAG。

    步骤3:发送消息

    1. 在需要发送消息的类中注入 RcoketMQTemplate
    @Value("${rocketmq.namespace}%${rocketmq.producer1.topic}")
    private String topic; // topic名称 (需要使用topic全称,所以在这里对topic名称进行拼接)
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    2. 发送消息,消息体可以是自定义对象,也可以是 Message 对象(org.springframework.messaging包中)。
    SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
    /*------------------------------------------------------------------------*/
    rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload(message).build())
    3. 完整示例如下。
    /**
    * Description: 消息生产者
    */
    @Service
    public class SendMessage {
    // 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置 格式:namespace全称%topic名称
    @Value("${rocketmq.namespace}%${rocketmq.producer1.topic}")
    private String topic;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
    * 同步发送
    *
    * @param message 消息内容
    * @param tags 订阅tags
    */
    public void syncSend(String message, String tags) {
    // springboot不支持使用header传递tags,根据要求,需要在topic后进行拼接 formats: `topicName:tags`,不拼接标识无tag
    String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
    SendResult sendResult = rocketMQTemplate.syncSend(destination,
    MessageBuilder.withPayload(message)
    .setHeader(MessageConst.PROPERTY_KEYS, "yourKey") // 指定业务key
    .build());
    System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
    }
    }
    说明
    该示例为同步发送。异步发送,单向发送等请参见 Demo或者前往GitHub 项目

    步骤4:消费消息

    @Service
    @RocketMQMessageListener(
    consumerGroup = "${rocketmq.namespace}%${rocketmq.consumer1.group}", // 消费组,格式:namespace全称%group名称
    // 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置 格式:namespace全称%topic名称
    topic = "${rocketmq.namespace}%${rocketmq.consumer1.topic}",
    selectorExpression = "${rocketmq.consumer1.subExpression}" // 订阅表达式, 不配置表示订阅所有消息
    )
    public class MessageConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
    System.out.println("Tag1Consumer receive message:" + message);
    }
    }
    可根据业务需求配置多个消费者。消费者其他配置可根据具体业务需求进行配置。
    说明
    完整示例参见下载 Demo或者前往GitHub 项目

    步骤5:查看消费详情

    登录 TDMQ 控制台,在集群管理 > Group 页面,可查看与 Group 连接的客户端列表,单击操作列的查看详情,可查看消费者详情。
    
    
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持