操作场景
本文以调用 Spring Boot Starter SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
操作步骤
步骤1:添加依赖
在 pom.xml 中添加依赖。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
步骤2:准备配置
在配置文件中添加配置信息。
server:
port: 8082
rocketmq:
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
producer:
group: group111
access-key: eyJrZXlJZC....
secret-key: admin
consumer:
access-key: eyJrZXlJZC....
secret-key: admin
namespace: rocketmq-xxx|namespace1
producer1:
topic: testdev1
consumer1:
group: group111
topic: testdev1
subExpression: TAG1
consumer2:
group: group222
topic: testdev1
subExpression: TAG2
|
name-server | 集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。 |
group | 生产者 Group 的名称,在控制台 Group 页面复制。 |
secret-key | |
access-key | |
namespace | 命名空间的名称,在控制台命名空间页面复制。 |
topic | Topic 的名称,在控制台 topic 页面复制。 |
subExpression | 用来设置消息的 TAG。 |
步骤3:发送消息
1. 在需要发送消息的类中注入 RcoketMQTemplate
。
@Value("${rocketmq.namespace}%${rocketmq.producer1.topic}")
private String topic;
@Autowired
private RocketMQTemplate rocketMQTemplate;
2. 发送消息,消息体可以是自定义对象,也可以是 Message 对象(org.springframework.messaging包中)。
SendResult sendResult = rocketMQTemplate.syncSend(destination, message);
/*------------------------------------------------------------------------*/
rocketMQTemplate.syncSend(destination, MessageBuilder.withPayload(message).build())
3. 完整示例如下。
@Service
public class SendMessage {
@Value("${rocketmq.namespace}%${rocketmq.producer1.topic}")
private String topic;
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void syncSend(String message, String tags) {
String destination = StringUtils.isBlank(tags) ? topic : topic + ":" + tags;
SendResult sendResult = rocketMQTemplate.syncSend(destination,
MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_KEYS, "yourKey")
.build());
System.out.printf("syncSend1 to topic %s sendResult=%s %n", topic, sendResult);
}
}
步骤4:消费消息
@Service
@RocketMQMessageListener(
consumerGroup = "${rocketmq.namespace}%${rocketmq.consumer1.group}",
topic = "${rocketmq.namespace}%${rocketmq.consumer1.topic}",
selectorExpression = "${rocketmq.consumer1.subExpression}"
)
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("Tag1Consumer receive message:" + message);
}
}
可根据业务需求配置多个消费者。消费者其他配置可根据具体业务需求进行配置。
步骤5:查看消费详情
登录 TDMQ 控制台,在集群管理 > Group 页面,可查看与 Group 连接的客户端列表,单击操作列的查看详情,可查看消费者详情。
本页内容是否解决了您的问题?