pom.xml
file:<!-- in your <dependencies> block --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency>
MessageProducer.java
.import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.tencent.tdmq.demo.cloud.Constant;/*** Message producer*/public class MessageProducer {/*** Exchange name*/private static final String EXCHANGE_NAME = "exchange_name";public static void main(String[] args) throws Exception {// Connection factoryConnectionFactory factory = new ConnectionFactory();// Set the service address (replace with the access point address copied in the console)factory.setUri("amqp://***");// Set the vhost (copy the vhost name in the open-source RabbitMQ console)factory.setVirtualHost(VHOST_NAME);// Set the username (use the username in the permission configuration of the vhost in the open-source RabbitMQ console)factory.setUsername(USERNAME);// Set the password (use the user key)factory.setPassword("****");// Get the connection address and establish the channeltry (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// Bind the message exchange (`EXCHANGE_NAME` must exist in the TDMQ for RabbitMQ console, and the exchange type must be the same as that in the console)channel.exchangeDeclare(EXCHANGE_NAME, "fanout");for (int i = 0; i < 10; i++) {String message = "this is rabbitmq message " + i;// Publish a message to the exchange, which will automatically deliver the message to the corresponding queuechannel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [producer] Sent '" + message + "'");}} catch (Exception e) {e.printStackTrace();}}}
Parameter | Description |
EXCHANGE_NAME | Exchange name, which can be obtained from the exchange list in the console. |
factory.setUri | Cluster access address, which can be obtained from the Client Access section on the Basic Info page of the cluster. |
factory.setVirtualHost | Vhost name, which can be obtained from the vhost list in the console. |
factory.setUsername | Enter the name of the user created in the console. |
factory.setPassword | Enter the password of the user created in the console. |
MessageConsumer.java
.import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.tencent.tdmq.demo.cloud.Constant;import java.io.IOException;import java.nio.charset.StandardCharsets;/*** Message consumer*/public class MessageConsumer1 {/*** Queue name*/public static final String QUEUE_NAME = "queue_name";/*** Exchange name*/private static final String EXCHANGE_NAME = "exchange_name";public static void main(String[] args) throws Exception {// Connection factoryConnectionFactory factory = new ConnectionFactory();// Set the service address (replace with the access point address copied in the console)factory.setUri("amqp://***");// Set the vhost (copy the vhost name in the open-source RabbitMQ console)factory.setVirtualHost(VHOST_NAME);// Set the username (use the username in the permission configuration of the vhost in the open-source RabbitMQ console)factory.setUsername(USERNAME);// Set the password (use the user key)factory.setPassword("****");// Get the connection addressConnection connection = factory.newConnection();// Establish a channelChannel channel = connection.createChannel();// Bind the message exchangechannel.exchangeDeclare(EXCHANGE_NAME, "fanout");// Declare the queue messagechannel.queueDeclare(QUEUE_NAME, true, false, false, null);// Bind the message exchange (`EXCHANGE_NAME` must exist in the TDMQ for RabbitMQ console, and the exchange type must be the same as that in the console)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [Consumer1] Waiting for messages.");// Subscribe to the messagechannel.basicConsume(QUEUE_NAME, false, "ConsumerTag", new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {// Received message for business logic processingSystem.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());channel.basicAck(envelope.getDeliveryTag(), false);}});}}
Parameter | Description |
QUEUE_NAME | Queue name, which can be obtained from the queue list in the console. |
EXCHANGE_NAME | Exchange name, which can be obtained from the exchange list in the console. |
factory.setUri | Cluster access address, which can be obtained from the Client Access section on the Basic Info page of the cluster. |
factory.setVirtualHost | Vhost name, which can be obtained from the vhost list in the console. |
factory.setUsername | Enter the name of the user created in the console. |
factory.setPassword | Enter the password of the user created in the console. |
Was this page helpful?