Message Type | Retry Interval | Maximum Number of Retries |
General Message | 5 minutes | 288 |
Sequential message | 1 minute | 288 |
pom.xml
file. This document uses a Maven project as an example.<!-- in your <dependencies> block --><dependency><groupId>com.aliyun.mq</groupId><artifactId>mq-http-sdk</artifactId><version>1.0.3</version></dependency>
// Get the clientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// Get the topic producerMQProducer producer = mqClient.getProducer(namespace, topicName);
Parameter | Description |
topicName | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
namespace | Namespace name, which can be copied under the Namespace tab on the Cluster page in the console. |
endpoint | Cluster access address over HTTP, which can be obtained from Access Address in the Operation column on the Cluster page in the console. |
secretKey | |
accessKey | |
try {for (int i = 0; i < 10; i++) {TopicMessage pubMsg;pubMsg = new TopicMessage(("Hello RocketMQ " + i).getBytes(),"TAG");// Set the ShardingKey of the partitionally sequential messagepubMsg.setShardingKey(i % 3);TopicMessage pubResultMsg = producer.publishMessage(pubMsg);System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());}} catch (Throwable e) {System.out.println("Send mq message failed.");e.printStackTrace();}
Parameter | Description |
TAG | Set the message tag. |
ShardingKey | A partition field of sequential messages. Messages with the same ShardingKey will be sent to the same partition. |
// Get the clientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// Get the topic consumerMQProducer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
Parameter | Description |
topicName | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
namespace | Namespace name, which can be copied under the Namespace tab on the Cluster page in the console. |
TAG | Subscribed tag. |
endpoint | Cluster access address over HTTP, which can be obtained from Access Address in the Operation column on the Cluster page in the console. |
secretKey | |
accessKey | |
do {List<Message> messages = null;try {// Long polling consumes messages sequentially. Although the messages obtained may be from multiple partitions, the messages in a partition will definitely be sequential.// For sequential consumption, as long as a message in a partition hasn't been acknowledged to be consumed successfully, it will be consumed next time for this partition.// For a partition, the next batch of messages can only be consumed after all previous messages are acknowledged to be consumed successfully.messages = consumer.consumeMessageOrderly(Integer.parseInt(batchSize),Integer.parseInt(waitSeconds));} catch (Throwable e) {e.printStackTrace();}if (messages == null || messages.isEmpty()) {System.out.println(Thread.currentThread().getName() + ": no new message, continue!");continue;}for (Message message : messages) {System.out.println("Receive message: " + message);}{List<String> handles = new ArrayList<String>();for (Message message : messages) {handles.add(message.getReceiptHandle());}try {consumer.ackMessage(handles);} catch (Throwable e) {if (e instanceof AckMessageException) {AckMessageException errors = (AckMessageException) e;System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");if (errors.getErrorMessages() != null) {for (String errorHandle :errors.getErrorMessages().keySet()) {System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());}}continue;}e.printStackTrace();}}} while (true);
Parameter | Description |
batchSize | The number of messages pulled at a time. Maximum value: 16. |
waitSeconds | The polling waiting time for a message pull. Maximum value: 30 seconds. |
Was this page helpful?