对比项 | 标签过滤 | SQL 过滤 |
过滤目标 | 消息的 Tag 标签属性 | 消息的 k-v 属性 |
过滤能力 | 精准匹配 | SQL 语法匹配 |
适用场景 | 简单过滤场景、计算逻辑简单轻量 | 复杂过滤场景、计算逻辑较复杂 |
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.10.1</version> <!-- 推荐版本 --></dependency>
go get -u github.com/apache/pulsar-client-go@master
// 构建生产者Producer<byte[]> producer = pulsarClient.newProducer()// 禁用掉batch功能.enableBatching(false)// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称.topic("persistent://pulsar-xxx/sdk_java/topic2").create();
producer, err := client.CreateProducer(pulsar.ProducerOptions{DisableBatching: true, // 禁用掉batch功能})
TAGS
。// 发送消息MessageId msgId = producer.newMessage().property("tag1", "TAGS").value(value.getBytes(StandardCharsets.UTF_8)).send();// 订阅相关参数,可用来设置订阅标签(TAG)HashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxxx/sdk_java/topic2")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("topic_sub1")// 声明消费模式为共享模式.subscriptionType(SubscriptionType.Shared)// 订阅相关参数,tag订阅等。。.subscriptionProperties(subProperties)// 配置从最早开始消费,否则可能会消费不到历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// 发送消息if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS",},}); err != nil {log.Fatal(err)}// 创建 consumerconsumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1"},})
// 发送消息MessageId msgId = producer.newMessage().property("tag1", "TAGS").property("tag2", "TAGS").value(value.getBytes(StandardCharsets.UTF_8)).send();// 订阅相关参数,可用来设置订阅标签(TAG)HashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");subProperties.put("tag2","1");// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxxx/sdk_java/topic2")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("topic_sub1")// 声明消费模式为共享模式.subscriptionType(SubscriptionType.Shared)// 订阅相关参数,tag订阅等。。.subscriptionProperties(subProperties)// 配置从最早开始消费,否则可能会消费不到历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// 创建 producerif msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS","tag2": "TAGS",},}); err != nil {log.Fatal(err)}// 创建 consumerconsumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1","tag2": "1",},})
// 发送消息MessageId msgId = producer.newMessage().property("tag1", "TAGS").property("tag2", "TAGS").property("xxx", "yyy").value(value.getBytes(StandardCharsets.UTF_8)).send();// 订阅相关参数,可用来设置订阅标签(TAG)HashMap<String, String> subProperties = new HashMap<>();subProperties.put("tag1","1");subProperties.put("tag2","1");// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxxx/sdk_java/topic2")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("topic_sub1")// 声明消费模式为共享模式.subscriptionType(SubscriptionType.Shared)// 订阅相关参数,tag订阅等。。.subscriptionProperties(subProperties)// 配置从最早开始消费,否则可能会消费不到历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// 创建 producerif msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{Payload: []byte(fmt.Sprintf("hello-%d", i)),Properties: map[string]string{"tag1": "TAGS","tag2": "TAGS","xxx": "yyy",},}); err != nil {log.Fatal(err)}// 创建 consumerconsumer, err := client.Subscribe(pulsar.ConsumerOptions{Topic: "topic-1",SubscriptionName: "my-sub",SubscriptionProperties: map[string]string{"tag1": "1","tag2": "1",},})
// 构建生产者Producer<byte[]> producer = pulsarClient.newProducer()// 禁用掉batch功能.enableBatching(false)// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称.topic("persistent://pulsar-xxx/sdk_java/topic2").create();// 发送消息MessageId msgId = producer.newMessage().property("idc", "idc1") // 指定消息的属性(idc).property("label", "online") // 指定消息的属性(label).property("other", "xxx") // 指定消息的其他属性.value(value.getBytes(StandardCharsets.UTF_8)).send();
// 订阅相关参数HashMap<String, String> subProperties = new HashMap<>();// 消费者的properties中包含TDMQ_PULSAR_SQL92_FILTER_EXPRESSION,认为开启了SQL92过滤,value即为过滤表达式。subProperties.put("TDMQ_PULSAR_SQL92_FILTER_EXPRESSION","idc = 'idc1' AND label IS NOT NULL"); // 表达式说明:idc属性等于idc1 并且 label属性存在// 构建消费者Consumer<byte[]> consumer = pulsarClient.newConsumer()// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制.topic("persistent://pulsar-xxxx/sdk_java/topic2")// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName("topic_sub1")// 声明消费模式为共享模式.subscriptionType(SubscriptionType.Shared)// 订阅配置相关参数,里面携带SQL过滤表达式.subscriptionProperties(subProperties).subscribe();
语法 | 说明 | 示例 |
IS NULL | 判断属性不存在 | a IS NULL :属性a不存在。 |
IS NOT NULL | 判断属性存在 | a IS NOT NULL :属性a存在。 |
> >= < <= | 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明 可转化为数字的字符串也被认为是数字。 | a IS NOT NULL AND a > 100 :属性a存在且属性a的值大于100。 a IS NOT NULL AND a > 'abc' :错误示例,abc为字符串,不能用于比较大小。 |
BETWEEN xxx AND xxx | 用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。 | a IS NOT NULL AND (a BETWEEN 10 AND 100) :属性a存在且属性a的值大于等于10且小于等于100。 |
NOT BETWEEN xxx AND xxx | 用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。 | a IS NOT NULL AND (a NOT BETWEEN 10 AND 100) :属性a存在且属性a的值小于10或大于100。 |
IN (xxx, xxx) | 表示属性的值在某个集合内。集合的元素只能是字符串。 | a IS NOT NULL AND (a IN ('abc', 'def')) :属性a存在且属性a的值为abc或def。 |
= <> | 等于和不等于。可用于比较数字和字符串。 | a IS NOT NULL AND (a = 'abc' OR a<>'def') :属性a存在且属性a的值为abc或a的值不为def。 |
AND OR | 逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。 | a IS NOT NULL AND (a > 100) OR (b IS NULL) :属性a存在且属性a的值大于100或属性b不存在。 |
本页内容是否解决了您的问题?