tencent cloud

All product documents
TDMQ for RocketMQ
Sending and Receiving Filtered Messages
Last updated: 2023-03-28 10:15:45
Sending and Receiving Filtered Messages
Last updated: 2023-03-28 10:15:45

Overview

This document describes how to use open-source SDK to send and receive filtered messages by using the SDK for Java as an example. You can do so with tags or SQL expressions.

Prerequisites

You have created the required resources. If it is a globally sequential message, you need to create a single-queue topic. For more information, see Resource Creation and Preparation.
You have downloaded the demo here or have downloaded one at the GitHub project.
You have learned about the sending and receiving processes of general messages.

Tag-based option

The main code of creating producer and consumer is basically same as that for general messages.
For message production, a message need to be carried with a or more tags when constructing the message body.
For message consumption, a message need to be carried with a tag, an asterisk (*), or multiple tag expressions when being subscribed to.

Step 1. Produce messages

Sending messages

The main code of sending messages is basically same as that for general messages. However, a message is allowed to carry only a tag when constructing the message body.
int totalMessagesToSend = 5;
for (int i = 0; i < totalMessagesToSend; i++) {
Message msg = new Message(TOPIC_NAME, "Tag1", "Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
// Send the message
SendResult sendResult = producer.send(message);
System.out.println("sendResult = " + sendResult);
}

Step 2. Consume messages

Subscribing to messages

// Subscribe to all tags when subscribing to a topic
pushConsumer.subscribe(topic_name, "*");

//Subscribe to the specified tags
//pushConsumer.subscribe(TOPIC_NAME, "Tag1");

// Subscribe to multiple tags
//pushConsumer.subscribe(TOPIC_NAME, "Tag1||Tag2");
// Register a callback implementation class to process messages pulled from the broker
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// Message processing logic
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// Mark the message as being successfully consumed and return the consumption status
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// Start the consumer instance
pushConsumer.start();
Parameter
Description
topic_name
Topic name, which can be copied under the Topic tab on the Cluster page in the console.
"*"
If the subscription expression is left empty or specified as asterisk (*), all messages are subscribed to. tag1 || tag2 || tag3 means subscribing to multiple types of tags.
Note
Above is a brief introduction to message publishing and subscription. For more information, see GitHub Demo or official RocketMQ documentation.

SQL expression-based option

The main code of creating producer and consumer is basically same as that for general messages.
For message production, a message need to be carried with user-defined properties when constructing the message body.
For message consumption, a message need to be carried with corresponding SQL expression when being subscribed to.

Step 1. Produce messages

The main code of sending messages is basically same as that for general messages. However, a message is allowed to carry multiple user-defined properties when constructing the message body.
int totalMessagesToSend = 5;
for (int i = 0; i < totalMessagesToSend; i++) {
Message msg = new Message(TOPIC_NAME,"Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
msg.putUserProperty("key1","value1");
// Send the message
SendResult sendResult = producer.send(message);
System.out.println("sendResult = " + sendResult);
}

Step 2. Consume messages

The main code of consuming messages is basically same as that for general messages. However, a message need to be carried with corresponding SQL expression when being subscribed to.
pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));

// Subscribe to single-key SQL expression when subscribing to a topic
//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));

//Subscribe to multiple properties
//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));
// Register a callback implementation class to process messages pulled from the broker
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// Message processing logic
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// Mark the message as being successfully consumed and return the consumption status
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// Start the consumer instance
pushConsumer.start();
Note
Above is a brief introduction to message publishing and subscription. For more information, see GitHub Demo or official RocketMQ documentation.


Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon