tencent cloud

Feedback

Spring Cloud Stream

Last updated: 2024-01-03 11:45:32

    Overview

    This document describes how to use open-source SDK to send and receive messages by using the SDK for Spring Cloud Stream as an example and helps you better understand the message sending and receiving processes.

    Prerequisites

    You have created the required resources as instructed in Resource Creation and Preparation.

    Directions

    Step 1. Add dependencies

    Add Stream RabbitMQ dependencies to pom.xml.
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    Step 2. Prepare configurations

    1. Configure the configuration file (with the configuration of a direct exchange as an example).
    spring:
    application:
    name: application-name
    cloud:
    stream:
    rabbit:
    bindings:
    # Output channel name
    output:
    # Producer configuration information
    producer:
    # Type of the exchange used by the producer. If the exchange name already exists, this type must be the same as that of the exchange type
    exchangeType: direct
    # It is used to specify a routing key expression
    routing-key-expression: headers["routeTo"] # This field indicates that the `routeTo` field in the header is used as the routing key
    queueNameGroupOnly: true
    # Input channel name
    input:
    # Consumer configuration information
    consumer:
    # Type of the exchange used by the consumer. If the exchange name already exists, this type must be the same as that of the exchange type
    exchangeType: direct
    # Routing keys bound to the consumer message queue
    bindingRoutingKey: info,waring,error
    # The configuration will process the above routing keys
    bindingRoutingKeyDelimiter: "," # This configuration item indicates that commas are used to separate the configured routing keys
    # Message acknowledgment mode. For more information, see `AcknowledgeMode`
    acknowledge-mode: manual
    queueNameGroupOnly: true
    bindings:
    # Output channel name
    output: # Channel name
    destination: direct_logs # Name of the exchange to be used
    content-type: application/json
    default-binder: dev-rabbit
    # Input channel name
    input: # Channel name
    destination: direct_logs # Name of the exchange to be used
    content-type: application/json
    default-binder: dev-rabbit
    group: route_queue1 # Name of the message queue to be used
    binders:
    dev-rabbit:
    type: rabbit
    environment:
    spring:
    rabbitmq:
    host: amqp-xxx.rabbitmq.xxx.tencenttdmq.com #Cluster access address, which can be obtained by clicking “Access Address” in the “Operation” column on the cluster management page.
    port: 5672
    username: admin # Role name
    password: password # Role token
    virtual-host: vhostnanme # Vhost name
    Parameter
    Description
    bindingRoutingKey
    Routing keys bound to the consumer message queue. Message routing rule can be obtained in the Binding Key column in the binding list in the console.
    
    direct_log
    Exchange name, which can be obtained from the exchange list in the console.
    route_queue1
    Queue name, which can be obtained from the queue list in the console.
    host
    Cluster access address, which can be obtained from the Client Access section on the Basic Info page of the cluster.
    
    port
    Cluster access address port, which can be obtained in the Operation column on the Cluster Management page
    username
    Enter the name of the user created in the console.
    password
    Enter the password of the user created in the console.
    virtual-host
    Vhost name, which can be obtained from the vhost list in the console.
    2. Create a configuration file loading program.
    OutputMessageBinding.java
    public interface OutputMessageBinding {
    /**
    * Name of the channel to be used (output channel name)
    */
    String OUTPUT = "output";
    
    @Output(OUTPUT)
    MessageChannel output();
    }
    InputMessageBinding.java
    public interface InputMessageBinding {
    
    /**
    * Name of the channel to be used
    */
    String INPUT = "input";
    
    @Input(INPUT)
    SubscribableChannel input();
    }

    Step 3. Send messages

    Create and compile the message sending program IMessageSendProvider.java.
    // Import the configuration class
    @EnableBinding(OutputMessageBinding.class)
    public class MessageSendProvider {
    
    @Autowired
    private OutputMessageBinding outputMessageBinding;
    
    public String sendToDirect() {
    outputMessageBinding.output().send(MessageBuilder.withPayload("[info] This is a new message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "info").build());
    outputMessageBinding.output().send(MessageBuilder.withPayload("[waring] This is a new waring message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "waring").build());
    outputMessageBinding.output().send(MessageBuilder.withPayload("[error] This is a new error message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "error").build());
    return "success";
    }
    
    public String sendToFanout() {
    for (int i = 0; i < 3; i++) {
    outputMessageBinding.output().send(MessageBuilder.withPayload("This is a new message" + i).build());
    }
    return "success";
    }
    }
    Inject MessageSendProvider to the message sending class to send messages.

    Step 4. Consume messages

    Create and compile the message consuming program MessageConsumer.java. You can configure multiple channels to listen on different message queues.
    @Service
    @EnableBinding(InputMessageBinding.class)
    public class MessageConsumer {
    
    @StreamListener(InputMessageBinding.INPUT)
    public void test(Message<String> message) throws IOException {
    Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
    Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    channel.basicAck(deliveryTag, false);
    String payload = message.getPayload();
    System.out.println(payload);
    }
    }

    Step 5. View messages

    If you want to confirm whether the messages have been successfully sent to TDMQ for RabbitMQ, you can view the status of connected consumers on the Cluster > Queue page in the console.
    
    Note
    Above is a sample based on the pub/sub pattern of RabbitMQ, which can be configured as needed. For more information, see Demo or Spring cloud stream official documentation.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support