tencent cloud

文档反馈

消息过滤

最后更新时间:2024-12-03 10:04:45
    本文主要介绍 TDMQ Pulsar 版中消息过滤的功能、应用场景和使用方式。

    功能介绍

    消费者订阅了某个主题后,TDMQ Pulsar 版会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在服务端进行过滤,只获取到需要关注的消息,避免接收到大量无效的消息。因此,也可以简化业务逻辑的架构设计。
    TDMQ Pulsar 专业版支持两种类型的过滤方式:
    标签过滤,生产消息时打上一个或多个固定 Tag,消费者通过指定 Tag 订阅消息。
    SQL 过滤,生产消息时打上一个或多个 k-v 的属性,消费者可以通过更为灵活的 SQL 92 语法来订阅消息。
    对比项
    标签过滤
    SQL 过滤
    过滤目标
    消息的 Tag 标签属性
    消息的 k-v 属性
    过滤能力
    精准匹配
    SQL 语法匹配
    适用场景
    简单过滤场景、计算逻辑简单轻量
    复杂过滤场景、计算逻辑较复杂

    应用场景

    通常,一个 Topic 中存放的是相同业务属性的消息,例如交易流水 Topic 包含了下单流水、支付流水、发货流水等,业务若只想消费者其中一种类别的流水,可在客户端进行过滤,但这种过滤方式会带来带宽的资源浪费。
    针对上述场景,TDMQ Pulsar 提供 Broker 端过滤的方式,用户可在生产消息时设置一个或者多个 Tag 标签,消费时指定 Tag 订阅。
    
    img
    
    

    使用说明

    Tag 消息目前是通过 Properties 的方式传入的,可以通过如下方式获取:
    Java
    Go
    <dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.10.1</version> <!-- 推荐版本 -->
    </dependency>
    推荐使用 0.8.0 及以上版本。
    go get -u github.com/apache/pulsar-client-go@master

    标签过滤

    Tag 消息不支持 Batch 功能,Batch 功能默认是开启的。如果要使用 Tag 消息,需要在 Producer 侧禁用掉 batch,具体如下:
    Java
    Go
    // 构建生产者
    Producer<byte[]> producer = pulsarClient.newProducer()
    // 禁用掉batch功能
    .enableBatching(false)
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
    .topic("persistent://pulsar-xxx/sdk_java/topic2").create();
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
    DisableBatching: true, // 禁用掉batch功能
    })
    Tag 消息的过滤只针对已设置 Tag 的消息,消费者订阅 Topic 时若未设置 tag 的消息,Topic 中的所有消息都将被投递到消费端进行消费。
    如果要开启 Tag 消息,需要发送消息的时候,在 ProducerMessage 中设置 Properties 字段;同时在创建 Consumer 的时候需要在 ConsumerOptions 中指定 SubscriptionProperties 字段。
    在 ProducerMessage 中设置 Properties 字段时,其中 key 为 tag 的名字,value 为固定值:TAGS
    在 ConsumerOptions 中指定 SubscriptionProperties 字段时,其中 key 为要订阅的 tag 的名字,value 为 tag 的版本信息,为保留字段,目前没有实质含义,用来做后续功能的扩展,具体如下:
    指定单个 tag。
    Java
    Go
    // 发送消息
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();
    
    // 订阅相关参数,可用来设置订阅标签(TAG)
    HashMap<String, String> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    // 构建消费者
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("topic_sub1")
    // 声明消费模式为共享模式
    .subscriptionType(SubscriptionType.Shared)
    // 订阅相关参数,tag订阅等。。
    .subscriptionProperties(subProperties)
    // 配置从最早开始消费,否则可能会消费不到历史消息
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
    // 发送消息
    if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
    Payload: []byte(fmt.Sprintf("hello-%d", i)),
    Properties: map[string]string{
    "tag1": "TAGS",
    },
    }); err != nil {
    log.Fatal(err)
    }
    
    // 创建 consumer
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic: "topic-1",
    SubscriptionName: "my-sub",
    SubscriptionProperties: map[string]string{"tag1": "1"},
    })
    指定多个 tag。
    Java
    Go
    // 发送消息
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .property("tag2", "TAGS")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();
    
    // 订阅相关参数,可用来设置订阅标签(TAG)
    HashMap<String, String> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    subProperties.put("tag2","1");
    // 构建消费者
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("topic_sub1")
    // 声明消费模式为共享模式
    .subscriptionType(SubscriptionType.Shared)
    // 订阅相关参数,tag订阅等。。
    .subscriptionProperties(subProperties)
    // 配置从最早开始消费,否则可能会消费不到历史消息
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
    // 创建 producer
    if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
    Payload: []byte(fmt.Sprintf("hello-%d", i)),
    Properties: map[string]string{
    "tag1": "TAGS",
    "tag2": "TAGS",
    },
    }); err != nil {
    log.Fatal(err)
    }
    
    // 创建 consumer
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic: "topic-1",
    SubscriptionName: "my-sub",
    SubscriptionProperties: map[string]string{
    "tag1": "1",
    "tag2": "1",
    },
    })
    tag 与 properties 混合。
    Java
    Go
    // 发送消息
    MessageId msgId = producer.newMessage()
    .property("tag1", "TAGS")
    .property("tag2", "TAGS")
    .property("xxx", "yyy")
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();
    
    // 订阅相关参数,可用来设置订阅标签(TAG)
    HashMap<String, String> subProperties = new HashMap<>();
    subProperties.put("tag1","1");
    subProperties.put("tag2","1");
    // 构建消费者
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("topic_sub1")
    // 声明消费模式为共享模式
    .subscriptionType(SubscriptionType.Shared)
    // 订阅相关参数,tag订阅等。。
    .subscriptionProperties(subProperties)
    // 配置从最早开始消费,否则可能会消费不到历史消息
    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
    // 创建 producer
    if msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
    Payload: []byte(fmt.Sprintf("hello-%d", i)),
    Properties: map[string]string{
    "tag1": "TAGS",
    "tag2": "TAGS",
    "xxx": "yyy",
    },
    }); err != nil {
    log.Fatal(err)
    }
    
    // 创建 consumer
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
    Topic: "topic-1",
    SubscriptionName: "my-sub",
    SubscriptionProperties: map[string]string{
    "tag1": "1",
    "tag2": "1",
    },
    })
    注意:
    1. 单个消费者可以使用多个 Tag,多个 Tag 之间的关系是「或」。
    2. 多个消费者需要使用相同 Tag。若一个订阅中有不同消费者使用不同 Tag ,则会出现过滤规则覆盖的情况,进而影响业务逻辑。

    SQL 过滤

    生产者示例

    生产者可以在 property 中添加多个属性。
    // 构建生产者
    Producer<byte[]> producer = pulsarClient.newProducer()
    // 禁用掉batch功能
    .enableBatching(false)
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
    .topic("persistent://pulsar-xxx/sdk_java/topic2").create();
    
    // 发送消息
    MessageId msgId = producer.newMessage()
    .property("idc", "idc1") // 指定消息的属性(idc)
    .property("label", "online") // 指定消息的属性(label)
    .property("other", "xxx") // 指定消息的其他属性
    .value(value.getBytes(StandardCharsets.UTF_8))
    .send();

    消费者示例

    消费者的 properties 中必须包含 TDMQ_PULSAR_SQL92_FILTER_EXPRESSION,认为开启了 SQL92 过滤,value 即为 SQL 92 过滤表达式。
    // 订阅相关参数
    HashMap<String, String> subProperties = new HashMap<>();
    // 消费者的properties中包含TDMQ_PULSAR_SQL92_FILTER_EXPRESSION,认为开启了SQL92过滤,value即为过滤表达式。
    subProperties.put("TDMQ_PULSAR_SQL92_FILTER_EXPRESSION","idc = 'idc1' AND label IS NOT NULL"); // 表达式说明:idc属性等于idc1 并且 label属性存在
    
    // 构建消费者
    Consumer<byte[]> consumer = pulsarClient.newConsumer()
    // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
    .topic("persistent://pulsar-xxxx/sdk_java/topic2")
    // 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
    .subscriptionName("topic_sub1")
    // 声明消费模式为共享模式
    .subscriptionType(SubscriptionType.Shared)
    // 订阅配置相关参数,里面携带SQL过滤表达式
    .subscriptionProperties(subProperties)
    .subscribe();

    注意事项

    1. 如果 subscriptionProperties 中包含了 TDMQ_PULSAR_SQL92_FILTER_EXPRESSION,即认为此订阅使用 SQL 过滤。如果同时subscriptionProperties 中还存在其他 properties,其他 properties 忽略,即不再使用标签过滤。
    2. SQL 过滤是根据消息中的属性属性(即发送消息时候指定的 property)进行过滤的,和标签(Tag)过滤是相互独立的。即一旦使用了 SQL 过滤,完全按照消费属性来过滤,不再区分是否是 TAGS 标签的属性。对于 property 中 key=tag1,value=TAGS 的属性,Pulsar 将认为这是一个普通的property,key=tag1,value=TAGS 字符串的属性,不会特殊对待。
    3. 消息过滤不支持 Batch 功能,Batch 功能默认是开启的,需要在创建 Producer 的时候禁用掉 batch。如果是批量的消息,服务端不会执行消息过滤,会直接投递给消费者。
    4. SQL 语句中支持的属性(property)的命名格式只能由下面的字符组成:字母、数字、下划线。
    5. SQL 语句中的判断条件最大不能超过 50 个,建议不超过5个。换句话说,就是 SQL 表达式语句中的 AND 和 OR 的数量要小于 50 个。BETWEEN xxx AND xxx 和 NOT BETWEEN xxx AND xxx 中包含的 AND 也会计算在内。
    6. 如果新传入的 SQL 表达式规则不正确,导致服务端无法正确解析对应的 SQL 语句,则服务端会继续保持和之前一致的过滤方式。
    7. 由于 SQL 属性过滤是生产者定义消息属性,消费者设置 SQL 过滤条件,因此过滤条件的计算结果具有不确定性,服务端的处理方式如下:
    异常情况处理:如果过滤条件的表达式计算抛异常,消息默认被过滤,不会被投递给消费者。例如比较数字和非数字类型的值。
    空值情况处理:如果过滤条件的表达式计算值为null或不是布尔类型(true和false),则消息默认被过滤,不会被投递给消费者。例如发送消息时未定义某个属性,在订阅时过滤条件中直接使用该属性,则过滤条件的表达式计算结果为 null。
    数值类型不符处理:如果消息自定义属性为浮点型,但过滤条件中使用整数进行判断,则消息默认被过滤,不会被投递给消费者。
    8. 多个消费者需要使用相同的过滤规则。若一个订阅中有不同消费者使用不同规则 ,则会出现过滤规则覆盖的情况,进而影响业务逻辑。
    注意:
    SQL 过滤需要在 2024 年 9 月 10 日以后创建的专业版,才能支持该功能。存量集群若有需求,请 联系我们 升级。

    实践教程

    1. 消息中的 property 数量不宜过多,单个 property 的 key 和 value 值也不宜过大。建议 property 数量小于 10 个,整体字符串长度小于 512 byte。
    2. 使用 SQL 过滤的时候,过滤条件不易过多,建议过滤条件控制在 5 个以内。作为过滤条件的 value 值不宜过长,建议控制在 64 byte 以内。

    SQL 过滤语法规则

    语法
    说明
    示例
    IS NULL
    判断属性不存在
    a IS NULL :属性a不存在。
    IS NOT NULL
    判断属性存在
    a IS NOT NULL:属性a存在。
    > >= < <=
    用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。 说明 可转化为数字的字符串也被认为是数字。
    a IS NOT NULL AND a > 100:属性a存在且属性a的值大于100。
    a IS NOT NULL AND a > 'abc':错误示例,abc为字符串,不能用于比较大小。
    BETWEEN xxx AND xxx
    用于比较数字,不能用于比较字符串,否则消费者客户端启动时会报错。等价于>= xxx AND <= xxx。表示属性值在两个数字之间。
    a IS NOT NULL AND (a BETWEEN 10 AND 100):属性a存在且属性a的值大于等于10且小于等于100。
    NOT BETWEEN xxx AND xxx
    用于比较数字,不能用于比较字符串,否则消费者客户端启动会报错。等价于< xxx OR > xxx,表示属性值在两个值的区间之外。
    a IS NOT NULL AND (a NOT BETWEEN 10 AND 100):属性a存在且属性a的值小于10或大于100。
    IN (xxx, xxx)
    表示属性的值在某个集合内。集合的元素只能是字符串。
    a IS NOT NULL AND (a IN ('abc', 'def')):属性a存在且属性a的值为abc或def。
    = <>
    等于和不等于。可用于比较数字和字符串。
    a IS NOT NULL AND (a = 'abc' OR a<>'def'):属性a存在且属性a的值为abc或a的值不为def。
    AND OR
    逻辑与、逻辑或。可用于组合任意简单的逻辑判断,需要将每个逻辑判断内容放入括号内。
    a IS NOT NULL AND (a > 100) OR (b IS NULL):属性a存在且属性a的值大于100或属性b不存在。
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持