pom.xml
file. This document uses a Maven project as an example.<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.4</version></dependency>
// Instantiate the message producerDefaultMQProducer producer = new DefaultMQProducer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL permission);// Set the Nameserver addressproducer.setNamesrvAddr(nameserver);// Start the producer instanceproducer.start();
Parameter | Description |
groupName | Producer group name. We recommend that you use the corresponding topic name as the producer name. |
accessKey | |
secretKey | |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster page in the console. The namespace access address can be obtained under the Namespace tab on the Cluster page. |
for (int i = 0; i < 10; i++) {// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Send the messageSendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
TAG | A parameter used to set the message tag. |
// Disable retry upon sending failuresproducer.setRetryTimesWhenSendAsyncFailed(0);// Set the number of messages to be sentint messageCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// Logic for message sending successescountDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {// Logic for message sending failurescountDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e){e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
TAG | A parameter used to set the message tag. |
for (int i = 0; i < 10; i++) {// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Send one-way messagesproducer.sendOneway(msg);}
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
TAG | A parameter used to set the message tag. |
// Instantiate the consumerDefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); // ACL permission// Set the Nameserver addresspushConsumer.setNamesrvAddr(nameserver);
Parameter | Description |
groupName | Consumer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster page in the console. The namespace access address can be obtained under the Namespace tab on the Cluster page. |
secretKey | |
accessKey |
// Subscribe to a topicpushConsumer.subscribe(topic_name, "*");// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
"*" | If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags. |
Was this page helpful?