tencent cloud

文档反馈

发送与接收过滤消息

最后更新时间:2023-04-12 11:40:09

    操作场景

    本文以调用 Spring Boot Starter SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息过滤收发的完整过程。

    前提条件

    已经了解发送与接收普通消息
    下载 Demo或者前往GitHub 项目

    操作步骤

    发送消息

    和普通发送消息相同,发送时,只需要将 rocketMQTemplate 的发送 Topic 拼接上 对应的 tag 即可。
    
    // springboot不支持使用header传递tags,根据要求,需要在topic后进行拼接 formats: topicName:tags,不拼接标识无tag String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags; // object消息类型 SendResult sendResult = rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload(message) .setHeader(MessageConst.PROPERTY_KEYS, "yourKey") // 指定业务key .build()); System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
    例如 topic 是 TopicTest,tag是 TAG1, 那么调用 rocketMQTemplate 方法的第一个参数将是 TopicTest:TAG1

    消费消息

    设置 selectorExpression 字段为对应的过滤tag即可。如下代码中,将 rocketmq.consumer1.subExpression 设置为TAG1 就可以消费 TAG1 的消息
    @Service
    @RocketMQMessageListener(
    consumerGroup = "${rocketmq.namespace}%${rocketmq.consumer1.group}", // 消费组,格式:namespace全称%group名称
    // 需要使用topic全称,所以进行topic名称的拼接,也可以自己设置 格式:namespace全称%topic名称
    topic = "${rocketmq.namespace}%${rocketmq.consumer1.topic}",
    selectorExpression = "${rocketmq.consumer1.subExpression}" // 订阅表达式, 不配置表示订阅所有消息
    )
    public class Tag1Consumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
    System.out.println("Tag1Consumer receive message:" + message);
    }
    }
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持