操作场景
本文以调用 Spring Boot Starter SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息过滤收发的完整过程。
前提条件
已经了解发送与接收普通消息
操作步骤
发送消息
和普通发送消息相同,发送时,只需要将 rocketMQTemplate 的发送 Topic 拼接上 对应的 tag 即可。
topicName
:
tags
,不拼接标识无tag
String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult = rocketMQTemplate.syncSend(destination,
MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_KEYS, "yourKey")
.build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
例如 topic 是 TopicTest,tag是 TAG1, 那么调用 rocketMQTemplate 方法的第一个参数将是 TopicTest:TAG1
消费消息
设置 selectorExpression 字段为对应的过滤tag即可。如下代码中,将 rocketmq.consumer1.subExpression 设置为TAG1 就可以消费 TAG1 的消息
@Service
@RocketMQMessageListener(
consumerGroup = "${rocketmq.namespace}%${rocketmq.consumer1.group}",
topic = "${rocketmq.namespace}%${rocketmq.consumer1.topic}",
selectorExpression = "${rocketmq.consumer1.subExpression}"
)
public class Tag1Consumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Tag1Consumer receive message:" + message);
}
}
本页内容是否解决了您的问题?