tencent cloud

Feedback

Message Filtering

Last updated: 2024-12-03 10:04:45
    This document describes how to use message tag filtering in TDMQ for Apache Pulsar.

    Feature Overview

    After a consumer subscribes to a specific Topic, TDMQ for Apache Pulsar delivers all messages from that Topic to the consumer. If the consumer only needs to process specific messages, it can set filtering conditions on the server side to receive only the relevant messages. This helps avoid receiving a large number of irrelevant messages and simplifies the architecture design of the business logic.
    TDMQ for Apache Pulsar Professional Edition supports two types of filtering:
    Tag filtering: Assign one or more fixed Tags to messages during production, and consumers subscribe to messages by specifying the desired Tags.
    SQL filtering: Assign one or more key-value attributes to messages during production, and consumers can subscribe to messages using flexible SQL 92 syntax.
    Comparison Items
    Tag Filtering
    SQL Filtering
    Filtering target
    Message tag attributes
    Message key-value (k-v) attributes
    Filtering capability
    Exact match
    SQL syntax match
    Use cases
    Simple filtering scenarios with lightweight computational logic
    Complex filtering scenarios with more intricate computational logic

    Use Cases

    Generally, messages with the same business attributes are stored in the same topic. For example, when an order transaction topic contains messages of order placements, payments, and deliveries, and if you want to consume only one type of transaction messages in your business, you can filter them on the client, but this will waste bandwidth resources.
    To solve this problem, TDMQ for Apache Pulsar supports filtering on the broker. You can set one or more tags during message production and subscribe to specified tags during consumption.
    img
    

    Use Instructions

    Tagged messages are passed in through Properties and can be obtained as follows:
    Java
    Go
    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.10.1</version> <!-- Recommended -->
    </dependency>
    v0.8.0 or later is recommended.
    go get -u github.com/apache/pulsar-client-go@master

    Tag Filtering

    Tagged messages don't support batch operations. The batch operation feature is enabled by default. To use tagged messages, you need to disable it in the producer as follows:
    Java
    Go
    // Construct a producer
    Producer<byte[]> producer = pulsarClient.newProducer()
    // Disable batch operation
    .enableBatching(false)
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
    .topic("persistent://pulsar-xxx/sdk_java/topic2").create();
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
    DisableBatching: true, // Disable batch operation
    })
    Tag filtering applies only to messages with assigned Tags. If the consumer does not specify a Tag when subscribing to a Topic, all messages in the Topic will be delivered to the consumer for processing.
    To enable tagged message, set the Properties field in ProducerMessage when sending messages and set the SubscriptionProperties field in ConsumerOptions when creating consumers.
    When you set the Properties field in ProducerMessage, the key is the tag name, and the value is fixed to TAGS.
    When you set the SubscriptionProperties field in ConsumerOptions, the key is the tag name to be subscribed to, and the value is the tag version, which is reserved for feature extension in the future and has no meaning currently. You can configure as follows:
    Specify one tag
    Java
    Go
    // Send the message
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();
    
    // Subscription parameters, which can be used to set subscription tags
    HashMap<String, String> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("topic_sub1")
    // Declare the shared mode as the consumption mode
    .subscriptionType(SubscriptionType.Shared)
    // Subscription parameters for tag subscription
    .subscriptionProperties(subProperties)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
    // Send the message
    if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
    Payload: []byte(fmt.Sprintf("hello-%d", i)),
    Properties: map[string]string{
    "tag1": "TAGS",
    },
    }); err != nil {
    log.Fatal(err)
    }
    
    // Create a consumer
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic: "topic-1",
    SubscriptionName: "my-sub",
    SubscriptionProperties: map[string]string{"tag1": "1"},
    })
    Specify multiple tags
    Java
    Go
    // Send the message
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .property("tag2", "TAGS")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();
    
    // Subscription parameters, which can be used to set subscription tags
    HashMap<String, String> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    subProperties.put("tag2","1");
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("topic_sub1")
    // Declare the shared mode as the consumption mode
    .subscriptionType(SubscriptionType.Shared)
    // Subscription parameters for tag subscription
    .subscriptionProperties(subProperties)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
    // Create a producer
    if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
    Payload: []byte(fmt.Sprintf("hello-%d", i)),
    Properties: map[string]string{
    "tag1": "TAGS",
    "tag2": "TAGS",
    },
    }); err != nil {
    log.Fatal(err)
    }
    
    // Create a consumer
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic: "topic-1",
    SubscriptionName: "my-sub",
    SubscriptionProperties: map[string]string{
    "tag1": "1",
    "tag2": "1",
    },
    })
    Mix tags and properties
    Java
    Go
    // Send the message
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .property("tag2", "TAGS")
    .property("xxx", "yyy")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();
    
    // Subscription parameters, which can be used to set subscription tags
    HashMap<String, String> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    subProperties.put("tag2","1");
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page.
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("topic_sub1")
    // Declare the shared mode as the consumption mode
    .subscriptionType(SubscriptionType.Shared)
    // Subscription parameters for tag subscription
    .subscriptionProperties(subProperties)
    // Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
    // Create a producer
    if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
    Payload: []byte(fmt.Sprintf("hello-%d", i)),
    Properties: map[string]string{
    "tag1": "TAGS",
    "tag2": "TAGS",
    "xxx": "yyy",
    },
    }); err != nil {
    log.Fatal(err)
    }
    
    // Create a consumer
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic: "topic-1",
    SubscriptionName: "my-sub",
    SubscriptionProperties: map[string]string{
    "tag1": "1",
    "tag2": "1",
    },
    })
    Note:
    1. A single consumer can use multiple Tags, and the relationship between Tags is OR.
    2. Multiple consumers should use the same Tag. If different consumers in a subscription use different Tags, the filtering rules may overlap, potentially impacting business logic.

    SQL Filtering

    Producer Example

    Producers can add multiple attributes in the property.
    // Construct a producer
    Producer<byte[]> producer = pulsarClient.newProducer()
    // Disable batch operation
    .enableBatching(false)
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`
    .topic("persistent://pulsar-xxx/sdk_java/topic2").create();
    
    // Send the message
    MessageId msgId = producer.newMessage()
    .property("idc", "idc1") // Specify the attribute of the message (idc)
    .property("label", "online") // Specify the attribute of the message (label)
    .property("other", "xxx") // Specify other attributes of the message
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();

    Consumer Example

    The consumer’s properties should include TDMQ_PULSAR_SQL92_FILTER_EXPRESSION , which indicates that SQL 92 filtering is enabled. The value is the SQL 92 filtering expression.
    // Subscribe related parameters
    HashMap<String, String> subProperties = new HashMap<>();
    // The properties of the consumer include TDMQ_PULSAR_SQL92_FILTER_EXPRESSION. Assuming SQL92 filtering is enabled, the value is the filter expression.
    subProperties.put("TDMQ_PULSAR_SQL92_FILTER_EXPRESSION","idc = 'idc1' AND label IS NOT NULL"); // Expression Description: idc attribute equals idc1 and label attribute exists
    
    // Construct a consumer
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, copied from [Topic Management]
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // You need to create a subscription on the topic details page in the console and enter the subscription name here
    .subscriptionName("topic_sub1")
    // Declare the shared mode as the consumption mode
    .subscriptionType(SubscriptionType.Shared)
    // Subscription configuration parameters, carrying SQL filter expression
    .subscriptionProperties(subProperties)
    .subscribe();

    Notes

    1. If subscriptionProperties contains TDMQ_PULSAR_SQL92_FILTER_EXPRESSION, the subscription is considered to use SQL filtering. If other properties exist in subscriptionProperties simultaneously, they will be ignored, and tag filtering will not be applied.
    2. SQL filtering is based on the properties specified when sending a message (such as the property field) and is independent of Tag filtering. Once SQL filtering is used, messages are filtered solely according to the consumer’s properties, without distinguishing whether the attributes are related to TAGS. For example, if a property includes key=tag1 and value=TAGS, Pulsar treats this as a standard property with the key-value pair key=tag1 and value=TAGS and does not handle it as a special tag-related property.
    3. Message filtering does not support the Batch feature, which is enabled by default. You should disable batching when creating the Producer. If messages are batched, the server will not perform message filtering and will deliver the messages directly to the consumer.
    4. The naming format for properties supported in SQL statements can only consist of the following characters: letters, numbers, and underscores.
    5. The maximum number of conditions in an SQL statement is 50, though it is recommended to use no more than 5. This means the total number of AND and OR operators in the SQL expression should be fewer than 50. Additionally, AND within BETWEEN xxx AND xxx and NOT BETWEEN xxx AND xxx is also counted toward this limit.
    6. If the newly submitted SQL expression rule is incorrect and the server cannot parse the SQL statement properly, the server will continue using the previous filtering method.
    7. Since SQL attribute filtering relies on producers defining message properties and consumers setting SQL filtering conditions, the filtering results may be unpredictable. The server handles such scenarios as follows:
    Handling exceptions: If the evaluation of the filtering condition expression throws an exception, the message will be filtered out by default and not delivered to the consumer. For example, this can occur when comparing numeric and non-numeric values.
    Handling null values: If the result of the filtering condition expression is null or not of a Boolean type (true or false), the message will also be filtered out by default and not delivered to the consumer. For instance, if a property is undefined when a message is sent but is directly used in the filtering condition during subscription, the expression will evaluate to null.
    Handling mismatched numeric types: If a custom message attribute is of a floating-point type but the filtering condition uses an integer for evaluation, the message will be filtered out by default and not delivered to the consumer.
    8. Multiple consumers should use the same filtering rules. If different consumers within the same subscription use different rules, it may result in filtering rule conflicts, leading to rule overrides and affecting business logic.
    Note:
    SQL filtering is supported only in Professional Edition clusters created after September 10, 2024. For existing clusters that require this feature, contact us for an upgrade.

    Practical Guide

    1. Avoid having too many properties in a message, and ensure that the key and value of each property are not excessively large. It is recommended to limit the number of properties to fewer than 10, with the total string length not exceeding 512 bytes.
    2. When using SQL filtering, keep the number of filtering conditions minimal, ideally within 5. The value used as a filtering condition should also be concise, preferably within 64 bytes.

    SQL Filtering Syntax Rules

    Syntax
    Description
    Example
    IS NULL
    Checks if an attribute does not exist.
    a IS NULL : The attribute a does not exist.
    IS NOT NULL
    Checks if an attribute exists.
    a IS NOT NULL : The attribute a exists.
    > >= < <=
    Used to compare numbers; it cannot be used to compare strings, as doing so will cause an error when the consumer client starts. Strings that can be converted into numbers are considered numbers.
    a IS NOT NULL AND a > 100 : The attribute a exists, and its value is greater than 100.
    a IS NOT NULL AND a > 'abc' : Incorrect example, as abc is a string and cannot be used for numerical comparison.
    BETWEEN xxx AND xxx
    Used to compare numbers; it cannot be used to compare strings, as doing so will cause an error when the consumer client starts. Equivalent to >= xxx AND <= xxx, indicating that the attribute value falls between two numbers.
    a IS NOT NULL AND (a BETWEEN 10 AND 100) : The attribute a exists, and its value is greater than or equal to 10 and less than or equal to 100.
    NOT BETWEEN xxx AND xxx
    Used to compare numbers; it cannot be used to compare strings, as doing so will cause an error when the consumer client starts. Equivalent to < xxx OR > xxx, indicating that the attribute value falls outside a specified range.
    a IS NOT NULL AND (a NOT BETWEEN 10 AND 100) : The attribute a exists, and its value is less than 10 or greater than 100.
    IN (xxx, xxx)
    Indicates that the attribute value is within a specific set. The elements of the set should be strings.
    a IS NOT NULL AND (a IN ('abc', 'def')) : The attribute a exists, and its value is either abc or def.
    = <>
    Represents equality and inequality. Can be used to compare both numbers and strings.
    a IS NOT NULL AND (a = 'abc' OR a<>'def') : The attribute a exists, and its value is either abc or not def.
    AND OR
    Represents logical AND and logical OR. These can be used to combine any simple logical expressions. Each logical expression should be enclosed in parentheses.
    a IS NOT NULL AND (a > 100) OR (b IS NULL) : The attribute a exists and its value is greater than 100, or the attribute b does not exist.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support