tencent cloud

Feedback

Message Filtering

Last updated: 2023-10-19 11:02:16
    This document describes the message filtering feature of TDMQ for RocketMQ and its use cases and usage instructions.

    Feature Overview

    Message filtering means filtering messages by the message attribute. The message producer can configure message attributes to group messages before sending them to a topic, and the consumer that subscribes to the topic can filter messages based on their attributes so that only eligible messages are delivered to the consumer for consumption.
    If a consumer sets no filter conditions when subscribing to a topic, no matter whether filter attributes are set during message sending, all messages in the topic will be delivered to the consumer for consumption.

    Use Cases

    Generally, messages with the same business attributes are stored in the same topic. For example, when an order transaction topic contains messages of order placements, payments, and deliveries, and if you want to consume only one type of transaction messages in your business, you can filter them on the client, but this will waste bandwidth resources.
    To solve this problem, TDMQ supports message filtering on the broker. You can set one or more tags during message production and subscribe to specified tags during consumption.
    

    Usage Instructions

    Filtering by tag

    Sending messages

    You must specify tags for each message when sending it.
    Message msg = new Message("TOPIC","TagA","Hello world".getBytes());

    Subscribing to messages

    Subscribing to all tags: If a consumer wants to subscribe to all types of messages under a topic, an asterisk (*) can be used to represent all tags.
    consumer.subscribe("TOPIC", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    System.out.println(message.getMsgID());
    return Action.CommitMessage;
    }
    });
    Subscribing to one tag: If a consumer wants to subscribe to a certain type of messages under a topic, the tag should be specified clearly.
    consumer.subscribe("TOPIC", "TagA", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    System.out.println(message.getMsgID());
    return Action.CommitMessage;
    }
    });
    Subscribing to multiple tags: If a consumer wants to subscribe to multiple types of messages under a topic, two vertical bars (||) should be added between the two tags for separation.
    consumer.subscribe("TOPIC", "TagA||TagB", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
    System.out.println(message.getMsgID());
    return Action.CommitMessage;
    }
    });
    

    Filtering by SQL

    Sending messages

    The message sending code here is basically the same as the code for sending simple messages. Here, a message is allowed to carry multiple user-defined attributes when constructing the message body.
    int totalMessagesToSend = 5;
    for (int i = 0; i < totalMessagesToSend; i++) {
    Message msg = new Message(TOPIC_NAME,"Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
    msg.putUserProperty("key1","value1");
    // Send the message
    SendResult sendResult = producer.send(message);
    System.out.println("sendResult = " + sendResult);
    }

    Subscribing to messages

    The message consumption code here is basically the same as the code for consuming simple messages. However, a message needs to be carried with the corresponding SQL expression when being subscribed to.
    // Subscribe to all messages
    pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));
    
    // Subscribe to single-key SQL expression when subscribing to a topic
    //pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));
    
    // Subscribe to multiple attributes
    //pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));
    // Register a callback implementation class to process messages pulled from the broker
    pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    // Message processing logic
    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    // Mark the message as being successfully consumed and return the consumption status
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    // Start the consumer instance
    pushConsumer.start();
    Note
    Above is a brief introduction to message publishing and subscription. For more information, see TencentCloud/rocketmq-demo or Apache RocketMQ documentation.
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support