<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2021.0.5.0</version><exclusions><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId></exclusion><exclusion><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.7</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.7</version></dependency>
spring:cloud:stream:rocketmq:binder:# 服务地址全称name-server: rmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080# 角色名称secret-key: admin# 角色密钥access-key: eyJrZXlJZ...# producer groupgroup: producerGroupbindings:# channel名称, 与spring.cloud.stream.bindings下的channel名称对应Topic-TAG1-Input:consumer:# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)subscription: TAG1# channel名称Topic-TAG2-Input:consumer:subscription: TAG2bindings:# channel名称Topic-send-Output:# 指定topic, 对应创建的topic名称destination: TopicTestcontent-type: application/json# channel名称Topic-TAG1-Input:destination: TopicTestcontent-type: application/jsongroup: consumer-group1# channel名称Topic-TAG2-Input:destination: TopicTestcontent-type: application/jsongroup: consumer-group2
2.2.5-RocketMQ-RC1
与 2.2.5.RocketMQ.RC2
的订阅配置项为 subscription
, 其他低版本订阅配置项为 tags
。spring:cloud:stream:rocketmq:bindings:# channel名称, 与spring.cloud.stream.bindings下的channel名称对应Topic-test1:consumer:# 订阅的tag类型,根据消费者实际情况进行配置(默认是订阅所有消息)tags: TAG1# channel名称Topic-test2:consumer:tags: TAG2binder:# 服务地址全称name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:8080# 角色名称secret-key: admin# 角色密钥access-key: eyJrZXlJZ...bindings:# channel名称Topic-send:# 指定topic,destination: topic1content-type: application/json# 要使用group全称group: group1# channel名称Topic-test1:destination: topic1content-type: application/jsongroup: group1# channel名称Topic-test2:destination: topic1content-type: application/jsongroup: group2
参数 | 说明 |
name-server | 集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。 |
secret-key | 角色名称,在 集群权限 页面复制 SecretKey 复制。 |
access-key | 角色密钥,在 集群权限 页面复制 AccessKey 复制。 |
group | 生产者 Group 的名称,在控制台 Group 页面复制。 |
destination | Topic 的名称,在控制台 topic 页面复制。 |
/*** 自定义通道 Binder*/public interface CustomChannelBinder {/*** 发送消息(消息生产者)* 绑定配置中的channel名称*/@Output("Topic-send-Output")MessageChannel sendChannel();/*** 接收消息1(消费者1)* 绑定配置中的channel名称*/@Input("Topic-TAG1-Input")MessageChannel testInputChannel1();/*** 接收消息2(消费者2)* 绑定配置中的channel名称*/@Input("Topic-TAG2-Input")MessageChannel testInputChannel2();}
@EnableBinding({CustomChannelBinder.class})
CustomChannelBinder
。@Autowiredprivate CustomChannelBinder channelBinder;
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();channelBinder.sendChannel().send(message);
@Servicepublic class StreamConsumer {private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);/*** 监听channel (配置中的channel 名称)** @param messageBody 消息内容*/@StreamListener("Topic-TAG1-Input")public void receive(String messageBody) {logger.info("Receive1: 通过stream收到消息,messageBody = {}", messageBody);}/*** 监听channel (配置中的channel 名称)** @param messageBody 消息内容*/@StreamListener("Topic-TAG2-Input")public void receive2(String messageBody) {logger.info("Receive2: 通过stream收到消息,messageBody = {}", messageBody);}}
http://localhost:8080/test-simple
可以看到发送成功。观察开发 IDE 的输出日志。2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: 通过stream发送消息,messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: 通过stream收到消息,messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}
本页内容是否解决了您的问题?