pom.xml
file. This document uses a Maven project as an example.<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.4</version></dependency>
// Instantiate the message producerDefaultMQProducer producer = new DefaultMQProducer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL permission);// Set the Nameserver addressproducer.setNamesrvAddr(nameserver);// Start the producer instanceproducer.start();
Parameter | Description |
groupName | Producer group name. It is recommended to use the corresponding topic name. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC_NAME, ("Hello scheduled message " + i).getBytes());// Send the messageSendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
// Instantiate the consumerDefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL permission// Set the Nameserver addresspushConsumer.setNamesrvAddr(nameserver);
Parameter | Description |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
// Set broadcast consumption modepushConsumer.setMessageModel(MessageModel.BROADCASTING);// Subscribe to a topicpushConsumer.subscribe(topic_name, "*");// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Was this page helpful?