This document describes how to use message tag filtering in TDMQ for Apache Pulsar.
Feature Description
A message tag is used to categorize messages under a topic. When a producer in TDMQ for Apache Pulsar sends messages with specified tags, the consumer needs to subscribe to those messages by tag.
If a consumer configures no tags when subscribing to a topic, all messages in the topic will be delivered to the consumer for consumption.
Note:
In a subscription:
1. A single consumer can use multiple tags, and the relationship between multiple tags is OR.
2. Multiple consumers need to use the same tag. If different consumers use different tags in a subscription, there will be an issue of filtering rule overlap, affecting business logic.
Use Cases
Generally, messages with the same business attributes are stored in the same topic. For example, when an order transaction topic contains messages of order placements, payments, and deliveries, and if you want to consume only one type of transaction messages in your business, you can filter them on the client, but this will waste bandwidth resources.
To solve this problem, TDMQ for Apache Pulsar supports filtering on the broker. You can set one or more tags during message production and subscribe to specified tags during consumption.
Use Instructions
Tagged messages are passed in through Properties
and can be obtained as follows:
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.10.1</version>
</dependency>
v0.8.0 or later is recommended.
go get -u github.com/apache/pulsar-client-go@master
Use Limits of tagged messages
Tagged messages don't support batch operations. The batch operation feature is enabled by default. To use tagged messages, you need to disable it in the producer as follows:
Producer<byte[]> producer = pulsarClient.newProducer()
.enableBatching(false)
.topic("persistent://pulsar-xxx/sdk_java/topic2").create();
producer, err := client.CreateProducer(pulsar.ProducerOptions{
DisableBatching: true,
})
Tagged message filtering only takes effect for messages with tags; that is, messages without tags won't be filtered and will be pushed to all subscribers instead.
To enable tagged message, set the Properties
field in ProducerMessage
when sending messages and set the SubscriptionProperties
field in ConsumerOptions
when creating consumers.
When you set the Properties
field in ProducerMessage
, the key
is the tag name, and the value
is fixed to TAGS
.
When you set the SubscriptionProperties
field in ConsumerOptions
, the key
is the tag name to be subscribed to, and the value
is the tag version, which is reserved for feature extension in the future and has no meaning currently. You can configure as follows:
Specify one tag
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
.subscriptionName("topic_sub1")
.subscriptionType(SubscriptionType.Shared)
.subscriptionProperties(subProperties)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
},
}); err != nil {
log.Fatal(err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{"tag1": "1"},
})
Specify multiple tags
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.property("tag2", "TAGS")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
subProperties.put("tag2","1");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
.subscriptionName("topic_sub1")
.subscriptionType(SubscriptionType.Shared)
.subscriptionProperties(subProperties)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
"tag2": "TAGS",
},
}); err != nil {
log.Fatal(err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{
"tag1": "1",
"tag2": "1",
},
})
Mix tags and properties
MessageId msgId = producer.newMessage()
.property("tag1", "TAGS")
.property("tag2", "TAGS")
.property("xxx", "yyy")
.value(value.getBytes(StandardCharsets.UTF_8))
.send();
HashMap<String, String> subProperties = new HashMap<>();
subProperties.put("tag1","1");
subProperties.put("tag2","1");
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://pulsar-xxxx/sdk_java/topic2")
.subscriptionName("topic_sub1")
.subscriptionType(SubscriptionType.Shared)
.subscriptionProperties(subProperties)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Properties: map[string]string{
"tag1": "TAGS",
"tag2": "TAGS",
"xxx": "yyy",
},
}); err != nil {
log.Fatal(err)
}
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
SubscriptionProperties: map[string]string{
"tag1": "1",
"tag2": "1",
},
})
Note:
Once the SubscriptionProperties
field is set in the consumer, the tags processed by the subscription cannot be modified. To modify tags, unsubscribe the current subscription first and create a new subscription.
Was this page helpful?