int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));// Send the messageSendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
// Subscribe to all tags when subscribing to a topicpushConsumer.subscribe(topic_name, "*");//Subscribe to the specified tags//pushConsumer.subscribe(TOPIC_NAME, "Tag1");// Subscribe to multiple tags//pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Parameter | Description |
topic_name | Topic name, which can be copied under the Topic tab on the Cluster page in the console. |
"*" | If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags. |
int totalMessagesToSend = 5;for (int i = 0; i < totalMessagesToSend; i++) {Message msg = new Message(TOPIC_NAME,"Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));msg.putUserProperty("key1","value1");// Send the messageSendResult sendResult = producer.send(message);System.out.println("sendResult = " + sendResult);}
pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));// Subscribe to single-key SQL expression when subscribing to a topic//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));//Subscribe to multiple properties//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));// Register a callback implementation class to process messages pulled from the brokerpushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// Message processing logicSystem.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// Mark the message as being successfully consumed and return the consumption statusreturn ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Start the consumer instancepushConsumer.start();
Was this page helpful?