消息类型 | 重试间隔 | 最大重试次数 |
普通消息 | 5分钟 | 288 |
顺序消息 | 1分钟 | 288 |
<!-- in your <dependencies> block --><dependency><groupId>com.aliyun.mq</groupId><artifactId>mq-http-sdk</artifactId><version>1.0.3</version></dependency>
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ProducerMQProducer producer = mqClient.getProducer(namespace, topicName);
try {for (int i = 0; i < 10; i++) {TopicMessage pubMsg;pubMsg = new TopicMessage(("Hello RocketMQ " + i).getBytes(),"TAG");TopicMessage pubResultMsg = producer.publishMessage(pubMsg);System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());}} catch (Throwable e) {System.out.println("Send mq message failed.");e.printStackTrace();}
参数 | 说明 |
TAG | 设置消息的 TAG。 |
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ConsumerMQConsumer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
do {List<Message> messages = null;try {// 长轮询消费消息// 长轮询表示如果 topic 没有消息则请求会在服务端等待,如果有消息可以消费则立即返回messages = consumer.consumeMessage(Integer.parseInt(batchSize),Integer.parseInt(waitSeconds));} catch (Throwable e) {e.printStackTrace();}if (messages == null || messages.isEmpty()) {System.out.println(Thread.currentThread().getName() + ": no new message, continue!");continue;}for (Message message : messages) {System.out.println("Receive message: " + message);}{List<String> handles = new ArrayList<String>();for (Message message : messages) {handles.add(message.getReceiptHandle());}try {consumer.ackMessage(handles);} catch (Throwable e) {if (e instanceof AckMessageException) {AckMessageException errors = (AckMessageException) e;System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");if (errors.getErrorMessages() != null) {for (String errorHandle :errors.getErrorMessages().keySet()) {System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());}}continue;}e.printStackTrace();}}} while (true);
参数 | 说明 |
batchSize | 一次拉取的消息条数,支持最多16条。 |
waitSeconds | 一次拉取的轮询等待时间,支持最长30秒。 |
本页内容是否解决了您的问题?