操作场景
本文以调用 Java SDK 为例介绍通过开源 SDK 实现事务消息收发的操作过程。
前提条件
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入相关依赖,以 maven 工程为例,在 pom.xml 添加以下依赖:
说明
依赖版本要求 ≥ 4.9.3, 当前建议为4.9.4
<!-- in your <dependencies> block -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-acl</artifactId>
<version>4.9.4</version>
</dependency>
步骤2:生产消息
实现 TransactionListener
public class TransactionListenerImpl implements TransactionListener {
//半消息发送成功后,回调该方法执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//这里执行数据库事务,如果成功,就返回成功,否则返回未知,或者回滚,等待回查
return LocalTransactionState.UNKNOW;
}
//回查本地事务
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//这里查询本地db的数据状态,然后决定是否是否提交
return LocalTransactionState.COMMIT_MESSAGE;
}
}
创建消息生产者
TransactionListener transactionListener = new TransactionListenerImpl();
ProducerTransactionMQProducer producer = new TransactionMQProducer("transaction_group",
new AclClientRPCHook(new SessionCredentials(ClientCreater.ACCESS_KEY, ClientCreater.SECRET_KEY)));
producer.setNamesrvAddr(ClientCreater.NAMESERVER);
producer.setTransactionListener(transactionListener);
producer.start();
|
groupName | 生产者组名称,建议使用对应的topic名字 |
nameserver | 集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。 |
secretKey | |
accessKey | |
发送消息
for (int i = 0; i < 3; i++) {
Message msg = new Message(TOPIC_NAME, "your tag", "KEY" + i,("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
System.out.printf("%s%n", sendResult);
}
步骤3:消费消息
创建消费者
TDMQ RocketMQ 版支持 push 和 pull 两种消费模式。推荐Push消费模式
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(
groupName,
new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
pushConsumer.setNamesrvAddr(nameserver);
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive transaction messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
|
groupName | 生产者组名称,在控制台集群管理中Group 页签中复制。 |
nameserver | 集群接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。新版共享集群与专享集群命名接入点地址在命名空间列表获取。 |
secretKey | |
accessKey | |
订阅消息
根据消费模式不同,订阅方式也有所区别。
pushConsumer.subscribe(topic_name, "*");
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
pushConsumer.start();
步骤4:查看消费详情
登录 TDMQ 控制台,在集群管理 > Group 页面,可查看与 Group 连接的客户端列表,单击操作列的查看消费者详情,可查看消费者详情。
本页内容是否解决了您的问题?