tencent cloud

Spring Cloud Stream
Last updated: 2023-09-12 17:53:17
Spring Cloud Stream
Last updated: 2023-09-12 17:53:17

Overview

This document describes how to use Spring Cloud Stream to send and receive messages and helps you better understand the message sending and receiving processes.

Prerequisites

You have created the required resources as instructed in Resource Creation and Preparation.
You have downloaded the demo here or have downloaded one at the GitHub project.

Directions

Step 1. Import dependencies

Import spring-cloud-starter-stream-rocketmq-related dependencies in pom.xml. It is recommended to use v2021.0.4.0.
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2021.0.4.0</version>
</dependency>

Step 2. Add configurations

Add RocketMQ-related configurations to the configuration file.
spring:
cloud:
stream:
rocketmq:
binder:
# Full service address
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
# Role name
secret-key: admin
# Role token
access-key: eyJrZXlJZ...
# Full namespace name
namespace: rocketmq-xxx|namespace1
# producer group
group: producerGroup
bindings:
# Channel name, which is the same as the channel name in spring.cloud.stream.bindings.
Topic-TAG1-Input:
consumer:
# Tag type of the subscription, which is configured based on consumers’ actual needs. All messages are subscribed to by default.
subscription: TAG1
# Channel name
Topic-TAG2-Input:
consumer:
subscription: TAG2
bindings:
# Channel name
Topic-send-Output:
# Specify a topic, which refers to the one you created
destination: TopicTest
content-type: application/json
# Channel name
Topic-TAG1-Input:
destination: TopicTest
content-type: application/json
group: consumer-group1
# Channel name
Topic-TAG2-Input:
destination: TopicTest
content-type: application/json
group: consumer-group2
Note
1. Currently, only 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 or later versions support namespace configuration. If you use other versions, you need to concatenate topic and group names.
The format is as follows:
rocketmq-pngrpmk94d5o|stream%topic (format: namespace name %topic name)
rocketmq-pngrpmk94d5o|stream%group (format: namespace name%group name)
The format for Shared and Exclusive editions is as follows:
MQ_INST_rocketmqpj79obd2ew7v_test%topic (format: namespace name%topic name)
MQ_INST_rocketmqpj79obd2ew7v_test%group (format: namespace name%group name)
2. The subscription configuration item is subscription for 2.2.5-RocketMQ-RC1 and 2.2.5.RocketMQ.RC2 and is tags for other earlier versions.
The complete configuration items of other versions are as follows:
spring:
cloud:
stream:
rocketmq:
bindings:
# Channel name, which is the same as the channel name in spring.cloud.stream.bindings.
Topic-test1:
consumer:
# Tag type of the subscription, which is configured based on consumers’ actual needs. All messages are subscribed to by default.
tags: TAG1
# Channel name
Topic-test2:
consumer:
tags: TAG2
binder:
# Full service address
name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876
# Role name
secret-key: admin
# Role token
access-key: eyJrZXlJZ...
bindings:
# Channel name
Topic-send:
# Specify a topic in the format of `cluster ID|namespace name%topic name`, which refers to the one you created
destination: rocketmq-xxx|stream%topic1
content-type: application/json
# Name of the group to be used in the format of `cluster ID|namespace name%group name`
group: rocketmq-xxx|stream%group1
# Channel name
Topic-test1:
destination: rocketmq-xxx|stream%topic1
content-type: application/json
group: rocketmq-xxx|stream%group1
# Channel name
Topic-test2:
destination: rocketmq-xxx|stream%topic1
content-type: application/json
group: rocketmq-xxx|stream%group2
Parameter
Description
name-server
Cluster access address, which can be copied from Access Address in the Operation column on the Cluster page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list.
secret-key
Role name, which can be copied on the Role Management page.
access-key
Role token, which can be copied in the Token column on the Role Management page.

namespace
Namespace name, which can be copied on the Namespace page in the console.
group
Producer group name, which can be copied under the Group tab on the cluster details page.
destination
Topic name, which can be copied on the Topic page in the console.

Step 3. Configure channels

You can separately configure input and output channels as needed.
/**
* Custom channel binder
*/
public interface CustomChannelBinder {

/**
* (Message producers) send messages
* Bind the channel name in the configurations
*/
@Output("Topic-send-Output")
MessageChannel sendChannel();


/**
* (Consumer 1) receives message 1
* Bind the channel name in the configurations
*/
@Input("Topic-TAG1-Input")
MessageChannel testInputChannel1();

/**
* (Consumer 2) receives message 2
* Bind the channel name in the configurations
*/
@Input("Topic-TAG2-Input")
MessageChannel testInputChannel2();
}


Step 4. Add annotations

Add annotations to the configuration class or startup class. If multiple binders are configured, specify them in the annotations.
@EnableBinding({CustomChannelBinder.class})

Step 5. Send messages

1. Inject CustomChannelBinder into the class that needs to send messages.
@Autowired
private CustomChannelBinder channelBinder;
2. Use the corresponding output stream channel to send messages.
Message<String> message = MessageBuilder.withPayload("This is a new message.").build();
channelBinder.sendChannel().send(message);

Step 6. Consume messages

@Service
public class StreamConsumer {
private final Logger logger = LoggerFactory.getLogger(StreamDemoApplication.class);

/**
* Listen on the channel configured in the configurations
*
* @param messageBody message content
*/
@StreamListener("Topic-TAG1-Input")
public void receive(String messageBody) {
logger.info("Receive1: Messages are received through the stream. messageBody = {}", messageBody);
}

/**
* Listen on the channel configured in the configurations
*
* @param messageBody message content
*/
@StreamListener("Topic-TAG2-Input")
public void receive2(String messageBody) {
logger.info("Receive2: Messages are received through the stream. messageBody = {}", messageBody);
}
}

Step 7: Perform local testing

After starting the project locally, you can see from the console that the startup was successful.
You can see that the sending is successful by checking http://localhost:8080/test-simple in the browser. Watch the output log of the development IDE.
2023-02-23 19:19:00.441 INFO 21958 --- [nio-8080-exec-1] c.t.d.s.controller.StreamController : Send: send a message via stream, messageBody = GenericMessage [payload={"key":"value"}, headers={id=3f28bc70-da07-b966-a922-14a17642c9c4, timestamp=1677151140353}]
2023-02-23 19:19:01.138 INFO 21958 --- [nsumer-group1_1] c.t.d.s.StreamDemoApplication : Receive1: receive a message via stream, messageBody = {"headers":{"id":"3f28bc70-da07-b966-a922-14a17642c9c4","timestamp":1677151140353},"payload":{"key":"value"}}

You can see that a message of TAG1 is sent, and only the subscribers of TAG1 receive the message.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback