pom.xml
file. This document uses a Maven project as an example.<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.4</version></dependency>
// Instantiate the message producerDefaultMQProducer producer = new DefaultMQProducer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL permission);// Set the Nameserver addressproducer.setNamesrvAddr(nameserver);// Start the producer instanceproducer.start();
Parameter | Description |
groupName | Producer group name. It is recommended to use the corresponding topic name. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC_NAME, ("Hello scheduled message " + i).getBytes());// Set message delay levelmessage.setDelayTimeLevel(5);// Send the messageSendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
int totalMessagesToSend = 1;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC_NAME, ("Hello timer message " + i).getBytes());// Set the time for sending the messagelong timeStamp = System.currentTimeMillis() + 30000;// To send a timed message, you need to specify a time for it, and the message will be delivered at the specified time. For example, if you set the time to be 2022-08-08 08:08:08, the message will be delivered at 2022-08-08 08:08:08.// If the timestamp is set before the current time, the message will be delivered to the consumer immediately.// Set `__STARTDELIVERTIME` into the property of `msg`message.putUserProperty("__STARTDELIVERTIME", String.valueOf(timeStamp));// Send the messageSendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
// Instantiate the consumerDefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL permission// Set the Nameserver addresspushConsumer.setNamesrvAddr(nameserver);
Parameter | Description |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
// Subscribe to a topicpushConsumer.subscribe(topic_name, "*");// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
"*" | If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags. |
Was this page helpful?