<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.3</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.3</version></dependency>
// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))// ACL权限);// 设置NameServer的地址producer.setNamesrvAddr(nameserver);// 启动Producer实例producer.start();
for (int i = 0; i < 10; i++) {// 创建消息实例,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}
参数 | 说明 |
topic_name | 在控制台集群管理中 Topic 页签中复制具体 Topic 名称。 |
TAG | 用来设置消息的 TAG。 |
// 设置发送失败后不重试producer.setRetryTimesWhenSendAsyncFailed(0);// 设置发送消息的数量int messageCount = 10;final CountDownLatch countDownLatch = new CountDownLatch(messageCount);for (int i = 0; i < messageCount; i++) {try {final int index = i;// 创建消息实体,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功逻辑countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {// 消息发送失败逻辑countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}}countDownLatch.await(5, TimeUnit.SECONDS);
参数 | 说明 |
topic_name | 在控制台集群管理中 Topic 页签中复制具体 Topic 名称。 |
TAG | 用来设置消息的 TAG。 |
for (int i = 0; i < 10; i++) {// 创建消息实例,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送单向消息producer.sendOneway(msg);}
参数 | 说明 |
topic_name | 在控制台集群管理中 Topic 页签中复制具体 Topic 名称。 |
TAG | 用来设置消息的 TAG。 |
// 实例化消费者DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限// 设置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);
// 实例化消费者DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(namespace,groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));// 设置NameServer的地址pullConsumer.setNamesrvAddr(nameserver);// 设置从第一个偏移量开始消费pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅topicpushConsumer.subscribe(topic_name, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();
参数 | 说明 |
topic_name | 在控制台集群管理中 Topic 页签中复制具体 Topic 名称。 |
"*" | 订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。 |
// 订阅topicpullConsumer.subscribe(topic_name, "*");// 启动消费者实例pullConsumer.start();try {System.out.printf("Consumer Started.%n");while (true) {// 拉取消息List<MessageExt> messageExts = pullConsumer.poll();System.out.printf("%s%n", messageExts);}} finally {pullConsumer.shutdown();}
参数 | 说明 |
topic_name | 在控制台集群管理中 Topic 页签中复制具体 Topic 名称。 |
"*" | 订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。 |
本页内容是否解决了您的问题?