Comparison Items | Tag Filtering | SQL Filtering |
Filtering target | Message tag attributes | Message key-value (k-v) attributes |
Filtering capability | Exact match | SQL syntax match |
Use cases | Simple filtering scenarios with lightweight computational logic | Complex filtering scenarios with more intricate computational logic |
Properties
and can be obtained as follows:<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.10.1</version> <!-- Recommended --></dependency>
go get -u github.com/apache/pulsar-client-go@master
// Construct a producerProducer<byte[]> producer = pulsarClient.newProducer()// Disable batch operation.enableBatching(false)// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`.topic("persistent://pulsar-xxx/sdk_java/topic2").create();
producer, err := client.CreateProducer(pulsar.ProducerOptions{DisableBatching: true, // Disable batch operation})
Properties
field in ProducerMessage
when sending messages and set the SubscriptionProperties
field in ConsumerOptions
when creating consumers.Properties
field in ProducerMessage
, the key
is the tag name, and the value
is fixed to TAGS
.SubscriptionProperties
field in ConsumerOptions
, the key
is the tag name to be subscribed to, and the value
is the tag version, which is reserved for feature extension in the future and has no meaning currently. You can configure as follows:// Send the messageMessageId msgId = producer.newMessage().property("tag1", "TAGS").value(value.getBytes(StandardCharsets.UTF_8)).send();// Subscription parameters, which can be used to set subscription tagsHashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxxx/sdk_java/topic2")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("topic_sub1")// Declare the shared mode as the consumption mode.subscriptionType(SubscriptionType.Shared)// Subscription parameters for tag subscription.subscriptionProperties(subProperties)// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Send the messageif msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS",},}); err != nil {log.Fatal(err)}// Create a consumerconsumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1"},})
// Send the messageMessageId msgId = producer.newMessage().property("tag1", "TAGS").property("tag2", "TAGS").value(value.getBytes(StandardCharsets.UTF_8)).send();// Subscription parameters, which can be used to set subscription tagsHashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");subProperties.put("tag2","1");// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxxx/sdk_java/topic2")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("topic_sub1")// Declare the shared mode as the consumption mode.subscriptionType(SubscriptionType.Shared)// Subscription parameters for tag subscription.subscriptionProperties(subProperties)// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Create a producerif msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS","tag2": "TAGS",},}); err != nil {log.Fatal(err)}// Create a consumerconsumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1","tag2": "1",},})
// Send the messageMessageId msgId = producer.newMessage().property("tag1", "TAGS").property("tag2", "TAGS").property("xxx", "yyy").value(value.getBytes(StandardCharsets.UTF_8)).send();// Subscription parameters, which can be used to set subscription tagsHashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");subProperties.put("tag2","1");// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, which can be copied from the **Topic** page..topic("persistent://pulsar-xxxx/sdk_java/topic2")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("topic_sub1")// Declare the shared mode as the consumption mode.subscriptionType(SubscriptionType.Shared)// Subscription parameters for tag subscription.subscriptionProperties(subProperties)// Configure consumption starting at the earliest offset; otherwise, historical messages may not be consumed.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Create a producerif msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS","tag2": "TAGS","xxx": "yyy",},}); err != nil {log.Fatal(err)}// Create a consumerconsumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1","tag2": "1",},})
// Construct a producerProducer<byte[]> producer = pulsarClient.newProducer()// Disable batch operation.enableBatching(false)// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`.topic("persistent://pulsar-xxx/sdk_java/topic2").create();// Send the messageMessageId msgId = producer.newMessage().property("idc", "idc1") // Specify the attribute of the message (idc).property("label", "online") // Specify the attribute of the message (label).property("other", "xxx") // Specify other attributes of the message.value(value.getBytes(StandardCharsets.UTF_8)).send();
// Subscribe related parametersHashMap<String, String> subProperties = new HashMap<>();// The properties of the consumer include TDMQ_PULSAR_SQL92_FILTER_EXPRESSION. Assuming SQL92 filtering is enabled, the value is the filter expression.subProperties.put("TDMQ_PULSAR_SQL92_FILTER_EXPRESSION","idc = 'idc1' AND label IS NOT NULL"); // Expression Description: idc attribute equals idc1 and label attribute exists// Construct a consumerConsumer<byte[]> consumer = pulsarClient.newConsumer()// Complete path of the topic in the format of `persistent://cluster (tenant) ID/namespace/topic name`, copied from [Topic Management].topic("persistent://pulsar-xxxx/sdk_java/topic2")// You need to create a subscription on the topic details page in the console and enter the subscription name here.subscriptionName("topic_sub1")// Declare the shared mode as the consumption mode.subscriptionType(SubscriptionType.Shared)// Subscription configuration parameters, carrying SQL filter expression.subscriptionProperties(subProperties).subscribe();
Syntax | Description | Example |
IS NULL | Checks if an attribute does not exist. | a IS NULL : The attribute a does not exist. |
IS NOT NULL | Checks if an attribute exists. | a IS NOT NULL : The attribute a exists. |
> >= < <= | Used to compare numbers; it cannot be used to compare strings, as doing so will cause an error when the consumer client starts. Strings that can be converted into numbers are considered numbers. | a IS NOT NULL AND a > 100 : The attribute a exists, and its value is greater than 100.a IS NOT NULL AND a > 'abc' : Incorrect example, as abc is a string and cannot be used for numerical comparison. |
BETWEEN xxx AND xxx | Used to compare numbers; it cannot be used to compare strings, as doing so will cause an error when the consumer client starts. Equivalent to >= xxx AND <= xxx, indicating that the attribute value falls between two numbers. | a IS NOT NULL AND (a BETWEEN 10 AND 100) : The attribute a exists, and its value is greater than or equal to 10 and less than or equal to 100. |
NOT BETWEEN xxx AND xxx | Used to compare numbers; it cannot be used to compare strings, as doing so will cause an error when the consumer client starts. Equivalent to < xxx OR > xxx, indicating that the attribute value falls outside a specified range. | a IS NOT NULL AND (a NOT BETWEEN 10 AND 100) : The attribute a exists, and its value is less than 10 or greater than 100. |
IN (xxx, xxx) | Indicates that the attribute value is within a specific set. The elements of the set should be strings. | a IS NOT NULL AND (a IN ('abc', 'def')) : The attribute a exists, and its value is either abc or def. |
= <> | Represents equality and inequality. Can be used to compare both numbers and strings. | a IS NOT NULL AND (a = 'abc' OR a<>'def') : The attribute a exists, and its value is either abc or not def. |
AND OR | Represents logical AND and logical OR. These can be used to combine any simple logical expressions. Each logical expression should be enclosed in parentheses. | a IS NOT NULL AND (a > 100) OR (b IS NULL) : The attribute a exists and its value is greater than 100, or the attribute b does not exist. |
Was this page helpful?