DefaultMQProducer producer = new DefaultMQProducer(namespace, groupName,// ACL permissionnew AclClientRPCHook(new SessionCredentials(AK, SK)), true, null);
// Instantiate the consumerDefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(NAMESPACE,groupName,new AclClientRPCHook(new SessionCredentials(AK, SK)),new AllocateMessageQueueAveragely(), true, null);
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(NAMESPACE,groupName,new AclClientRPCHook(new SessionCredentials(AK, SK)));// Set the NameServer addresspullConsumer.setNamesrvAddr(NAMESERVER);pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);pullConsumer.setAutoCommit(false);pullConsumer.setEnableMsgTrace(true);pullConsumer.setCustomizedTraceTopic(null);
package com.lazycece.sbac.rocketmq.messagemodel;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/*** @author lazycece* @date 2019/8/21*/@Slf4j@Componentpublic class MessageModelConsumer {@Component@RocketMQMessageListener(topic = "topic-message-model",consumerGroup = "message-model-consumer-group",enableMsgTrace = true,messageModel = MessageModel.CLUSTERING)public class ConsumerOne implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("ConsumerOne: {}", message);}}}
Parameter | Description |
Production Address | Corresponding producer's address and port. |
Production Time | The time when the TDMQ for RocketMQ server acknowledged message receipt, accurate to the millisecond. |
Sending duration | The time expended to send the message from the producer to the TDMQ for RocketMQ server, accurate to the microsecond. |
Production Status | Whether the message production was successful. A failure generally denotes loss of part of the header data during message transmission, which may result in the preceding fields being NULL. |
Parameter | Description |
Storage Time | The time when the message is persistently stored. |
Storage Duration | The duration between when the message was persistently stored and when the TDMQ for RocketMQ server received the acknowledgment, accurate to the millisecond. |
Storage Status | Whether the persistent storage of the message was successful. If the status is failure, the message was not successfully stored, possibly due to disk damage or insufficient capacity. Such situation requires immediate submission of a ticket. |
Parameter | Description |
Consumer Group Name | The name of the Consumer Group. |
Consumption Mode | The consumption mode of the consumer group, supporting both cluster consumption and broadcast consumption modes. |
Number of Pushes | The number of times the TDMQ for RocketMQ server sends the message to the consumer. |
Last Pushed Time | The final instance in which the TDMQ for RocketMQ server sent the message to the consumer. |
Consumption Status | Unacknowledged: The TDMQ for RocketMQ server has sent the message to the consumer, but has not yet received an acknowledgment from the consumer. Acknowledged: The consumer has sent an acknowledgment (ack) to the TDMQ for RocketMQ server and the server has received the acknowledgment. Retried: The server has not received an acknowledgment and the message is subsequently sent again due to timeout. Retried but Unacknowledged: The TDMQ for RocketMQ server has resent the message to the consumer, but an acknowledgment in response from the consumer has still not been received. Moved to Dead Letter Queue: After a number of unsuccessful consumption attempts, the message has been sent to the dead letter queue. Note: If the consumption mode is broadcasting, the only state of consumption is Pushed. |
Was this page helpful?