pom.xml
file. This document uses a Maven project as an example.<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.4</version></dependency>
// Instantiate the message producerDefaultMQProducer producer = new DefaultMQProducer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL permission);// Set the Nameserver addressproducer.setNamesrvAddr(nameserver);// Start the producer instanceproducer.start();
Parameter | Description |
groupName | Producer group name. It is recommended to use the corresponding topic name. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC_NAME, ("Hello scheduled message " + i).getBytes());// Send the messageSendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
for (int i = 0; i < 3; i++) {int orderId = i % 3;// Construct message instanceMessage msg = new Message(TOPIC_NAME, "your tag", "KEY" + i,("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg1, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}
// Instantiate the consumerDefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL permission// Set the Nameserver addresspushConsumer.setNamesrvAddr(nameserver);
Parameter | Description |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
// Subscribe to a topicpushConsumer.subscribe(topic_name, "*");// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
"*" | If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags. |
Was this page helpful?