Stream RabbitMQ
dependencies to pom.xml
.<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency>
direct
exchange as an example).spring:application:name: application-namecloud:stream:rabbit:bindings:# Output channel nameoutput:# Producer configuration informationproducer:# Type of the exchange used by the producer. If the exchange name already exists, this type must be the same as that of the exchange typeexchangeType: direct# It is used to specify a routing key expressionrouting-key-expression: headers["routeTo"] # This field indicates that the `routeTo` field in the header is used as the routing keyqueueNameGroupOnly: true# Input channel nameinput:# Consumer configuration informationconsumer:# Type of the exchange used by the consumer. If the exchange name already exists, this type must be the same as that of the exchange typeexchangeType: direct# Routing keys bound to the consumer message queuebindingRoutingKey: info,waring,error# The configuration will process the above routing keysbindingRoutingKeyDelimiter: "," # This configuration item indicates that commas are used to separate the configured routing keys# Message acknowledgment mode. For more information, see `AcknowledgeMode`acknowledge-mode: manualqueueNameGroupOnly: truebindings:# Output channel nameoutput: # Channel namedestination: direct_logs # Name of the exchange to be usedcontent-type: application/jsondefault-binder: dev-rabbit# Input channel nameinput: # Channel namedestination: direct_logs # Name of the exchange to be usedcontent-type: application/jsondefault-binder: dev-rabbitgroup: route_queue1 # Name of the message queue to be usedbinders:dev-rabbit:type: rabbitenvironment:spring:rabbitmq:host: amqp-xxx.rabbitmq.xxx.tencenttdmq.com #Cluster access address, which can be obtained by clicking “Access Address” in the “Operation” column on the cluster management page.port: 5672username: admin # Role namepassword: password # Role tokenvirtual-host: vhostnanme # Vhost name
Parameter | Description |
bindingRoutingKey | Routing keys bound to the consumer message queue. Message routing rule can be obtained in the Binding Key column in the binding list in the console. |
direct_log | Exchange name, which can be obtained from the exchange list in the console. |
route_queue1 | Queue name, which can be obtained from the queue list in the console. |
host | Cluster access address, which can be obtained from the Client Access section on the Basic Info page of the cluster. |
port | Cluster access address port, which can be obtained in the Operation column on the Cluster Management page |
username | Enter the name of the user created in the console. |
password | Enter the password of the user created in the console. |
virtual-host | Vhost name, which can be obtained from the vhost list in the console. |
public interface OutputMessageBinding {/*** Name of the channel to be used (output channel name)*/String OUTPUT = "output";@Output(OUTPUT)MessageChannel output();}
public interface InputMessageBinding {/*** Name of the channel to be used*/String INPUT = "input";@Input(INPUT)SubscribableChannel input();}
IMessageSendProvider.java
.// Import the configuration class@EnableBinding(OutputMessageBinding.class)public class MessageSendProvider {@Autowiredprivate OutputMessageBinding outputMessageBinding;public String sendToDirect() {outputMessageBinding.output().send(MessageBuilder.withPayload("[info] This is a new message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "info").build());outputMessageBinding.output().send(MessageBuilder.withPayload("[waring] This is a new waring message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "waring").build());outputMessageBinding.output().send(MessageBuilder.withPayload("[error] This is a new error message.[" + System.currentTimeMillis() + "]").setHeader("routeTo", "error").build());return "success";}public String sendToFanout() {for (int i = 0; i < 3; i++) {outputMessageBinding.output().send(MessageBuilder.withPayload("This is a new message" + i).build());}return "success";}}
MessageSendProvider
to the message sending class to send messages.MessageConsumer.java
. You can configure multiple channels to listen on different message queues.@Service@EnableBinding(InputMessageBinding.class)public class MessageConsumer {@StreamListener(InputMessageBinding.INPUT)public void test(Message<String> message) throws IOException {Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false);String payload = message.getPayload();System.out.println(payload);}}
Was this page helpful?