pom.xml
file. This document uses a Maven project as an example.<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.3</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.3</version></dependency>
// Instantiate the message producersDefaultMQProducer producer = new DefaultMQProducer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))// ACL permission);// Set the NameServer addressproducer.setNamesrvAddr(nameserver);// Start the producer instancesproducer.start();
Parameter | Description |
namespace | Namespace name, which can be copied on the Namespace page in the console. |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be copied under the Network module on the cluster’s basic information page in the console. |
secretKey | |
accessKey | |
for (int i = 0; i < 10; i++) {// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Send the messageSendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
tag | A parameter used to set the message tag. |
// Disable retry upon sending failuresproducer.setRetryTimesWhenSendAsyncFailed(0);// Set the number of messages to be sentint messageCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// Logic for message sending successescountDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {// Logic for message sending failurescountDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e){e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
tag | A parameter used to set the message tag. |
for (int i = 0; i < 10; i++) {// Create a message instance and set the topic and message contentMessage msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));Send one-way messagesproducer.sendOneway(msg);}
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
tag | A parameter used to set the message tag. |
// Instantiate the consumerDefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL permission// Set the NameServer addresspushConsumer.setNamesrvAddr(nameserver);
Parameter | Description |
namespace | Namespace name, which can be copied on the Namespace page in the console. |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be copied under the Network module on the cluster’s basic information page in the console. |
secretKey | |
accessKey | |
// Instantiate the consumerDefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));// Set the NameServer addresspullConsumer.setNamesrvAddr(nameserver);// Specify the first offset as the start offset for consumptionpullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
Parameter | Description |
namespace | Namespace name, which can be copied under the Namespace tab in the console. Its format is cluster ID + | + namespace. |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. |
secretKey | |
accessKey | |
// Subscribe to a topicpushConsumer.subscribe(topic_name, "*");// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
"*" | If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags. |
// Subscribe to a topicpullConsumer.subscribe(topic_name, "*");// Start the consumer instancepullConsumer.start();try {System.out.printf("Consumer Started.%n");while (true) {// Pull the messageList<MessageExt> messageExts = pullConsumer.poll();System.out.printf("%s%n", messageExts);}} finally {pullConsumer.shutdown();}
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
"*" | If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags. |
Was this page helpful?