pom.xml
file. This document uses a Maven project as an example.<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.4</version></dependency>
public class TransactionListenerImpl implements TransactionListener {// After the half message is sent successfully, call back this method to execute the local transaction@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// Execute the database transaction here. If the execution is successful, it will return success message; otherwise, it will return the unknown status code, perform rollback, or wait for another checkback.return LocalTransactionState.UNKNOW;}// Check back local transaction@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// Here query the data status of the local database, and then decide whether to submitreturn LocalTransactionState.COMMIT_MESSAGE;}}
//Users need to inplement a TransactionListener instance,TransactionListener transactionListener = new TransactionListenerImpl();// Instantiate a transactional message producerProducerTransactionMQProducer producer = new TransactionMQProducer("transaction_group",// ACL permissionnew AclClientRPCHook(new SessionCredentials(ClientCreater.ACCESS_KEY, ClientCreater.SECRET_KEY)));// Set the Nameserver addressproducer.setNamesrvAddr(ClientCreater.NAMESERVER);producer.setTransactionListener(transactionListener);producer.start();
Parameter | Description |
groupName | Producer group name. It is recommended to use the corresponding topic name. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
for (int i = 0; i < 3; i++) {// Construct message instanceMessage msg = new Message(TOPIC_NAME, "your tag", "KEY" + i,("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.sendMessageInTransaction(msg,null);System.out.printf("%s%n", sendResult);}
// Instantiate the consumerDefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL permission// Set the Nameserver addresspushConsumer.setNamesrvAddr(nameserver);pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive transaction messages: %s %n", Thread.currentThread().getName(), msgs);// Mark that the message has been successfully consumedreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
Parameter | Description |
groupName | Producer group name, which can be copied under the Group tab on the Cluster page in the console. |
nameserver | Cluster access address, which can be obtained from Access Address in the Operation column on the Cluster Management page in the console. Namespace access addresses in new virtual or exclusive clusters can be copied from the Namespace list. |
secretKey | |
accessKey | |
// Subscribe to a topicpushConsumer.subscribe(topic_name, "*");// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Was this page helpful?