spring-cloud-starter-stream-rocketmq
-related dependencies in pom.xml. It is recommended to use v2021.0.4.0.<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId><version>2021.0.4.0</version></dependency>
spring:cloud:stream:rocketmq:binder:# Full service addressname-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876# Role namesecret-key: admin# Role tokenaccess-key: eyJrZXlJZ...# Full namespace namenamespace: rocketmq-xxx|namespace1# producer groupgroup: producerGroupbindings:# Channel name, which is the same as the channel name in spring.cloud.stream.bindings.Topic-TAG1-Input:consumer:# Tag type of the subscription, which is configured based on consumers’ actual needs. All messages are subscribed to by default.subscription: TAG1# Channel nameTopic-TAG2-Input:consumer:subscription: TAG2bindings:# Channel nameTopic-send-Output:# Specify a topic, which refers to the one you createddestination: TopicTestcontent-type: application/json# Channel nameTopic-TAG1-Input:destination: TopicTestcontent-type: application/jsongroup: consumer-group1# Channel nameTopic-TAG2-Input:destination: TopicTestcontent-type: application/jsongroup: consumer-group2
2.2.5-RocketMQ-RC1
and 2.2.5.RocketMQ.RC2
or later versions support namespace configuration. If you use other versions, you need to concatenate topic and group names.subscription
for 2.2.5-RocketMQ-RC1
and 2.2.5.RocketMQ.RC2
and is tags
for other earlier versions.spring:cloud:stream:rocketmq:bindings:# Channel name, which is the same as the channel name in spring.cloud.stream.bindings.Topic-test1:consumer:# Tag type of the subscription, which is configured based on consumers’ actual needs. All messages are subscribed to by default.tags: TAG1# Channel nameTopic-test2:consumer:tags: TAG2binder:# Full service addressname-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876# Role namesecret-key: admin# Role tokenaccess-key: eyJrZXlJZ...bindings:# Channel nameTopic-send:# Specify a topic in the format of `cluster ID|namespace name%topic name`, which refers to the one you createddestination: rocketmq-xxx|stream%topic1content-type: application/json# Name of the group to be used in the format of `cluster ID|namespace name%group name`group: rocketmq-xxx|stream%group1# Channel nameTopic-test1:destination: rocketmq-xxx|stream%topic1content-type: application/jsongroup: rocketmq-xxx|stream%group1# Channel nameTopic-test2:destination: rocketmq-xxx|stream%topic1content-type: application/jsongroup: rocketmq-xxx|stream%group2
Parameter | Description |
name-server | Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secret-key | |
access-key | |
namespace | Namespace name, which can be copied on the Namespace page in the console. |
group | Producer group name, which can be copied under the Group tab on the cluster details page. |
destination | Topic name, which can be copied on the Topic page in the console. |
/*** Custom channel binder*/public interface CustomChannelBinder {/*** (Message producers) send messages* Bind the channel name in the configurations*/@Output("Topic-send-Output")MessageChannel sendChannel();/*** (Consumer 1) receives message 1* Bind the channel name in the configurations*/@Input("Topic-TAG1-Input")MessageChannel testInputChannel1();/*** (Consumer 2) receives message 2* Bind the channel name in the configurations*/@Input("Topic-TAG2-Input")MessageChannel testInputChannel2();}
@EnableBinding({CustomChannelBinder.class})
CustomChannelBinder
into the class that needs to send messages.@Autowiredprivate CustomChannelBinder channelBinder;
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();channelBinder.sendChannel().send(message);
@Servicepublic class StreamConsumer {private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);/*** Listen on the channel configured in the configurations** @param messageBody message content*/@StreamListener("Topic-TAG1-Input")public void receive(String messageBody) {logger.info("Receive1: Messages are received through the stream. messageBody = {}", messageBody);}/*** Listen on the channel configured in the configurations** @param messageBody message content*/@StreamListener("Topic-TAG2-Input")public void receive2(String messageBody) {logger.info("Receive2: Messages are received through the stream. messageBody = {}", messageBody);}}
http://localhost:8080/test-simple
in the browser. Watch the output log of the development IDE.2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: send a message via stream, messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: receive a message via stream, messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}
Was this page helpful?