int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));// 发送消息SendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
// 订阅topic 订阅所有的TAGpushConsumer.subscribe(topic_name, "*");//订阅指定的TAG//pushConsumer.subscribe(TOPIC_NAME, "Tag1");//订阅多个TAG//pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();
参数 | 说明 |
topic_name | 在控制台集群管理中 Topic 页签中复制具体 Topic 名称。 |
"*" | 订阅表达式如果为 null 或*表达式表示订阅全部,同时支持 "tag1 || tag2 || tag3" 标识订阅多个类型的 tag。 |
int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message msg = new Message(TOPIC_NAME,"Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));msg.putUserProperty("key1","value1");// 发送消息SendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));// 订阅topic 订阅单个key的sql,最常用//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));//订阅多个属性//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();
本页内容是否解决了您的问题?