CMQ 的 Topic 模式支持订阅筛选标签(tag)功能,类似于 rabbitMQ 的 direct_routing 模式,topic_pattern 模式在下个迭代提供。由于筛选标签的使用策略较复杂,本文将结合不同场景进行说明。
场景说明
场景1
有4个订阅者 A、B、C、D,A 消息标签是 apple,B 消息标签是 xiaomi,C 的消息标签是 imac+xiaomi,D 没有设置任何标签。
此时,有生产者 publish 到 Topic 100条消息,带的消息标签过滤为:apple、imac、iphone、macbook。此后 Topic 立即投递到 A、B、C、D。
场景分析:
|
| | 由于 apple 能匹配消息过滤标签中的 apple,则能正常收到100条消息。 |
| | |
| | 由于其中一个标签 imac 能匹配,则能正常收到100条消息 。 |
| | |
场景2
Topic 有且仅有4个订阅者 A、B、C、D,四个订阅者都没有设置消息 tag。
此时,有生产者 publish 到 Topic 100条消息,带的消息标签过滤为:apple、imac、iphone、macbook。此后 Topic 立即投递到 A、B、C、D。
场景分析:
订阅者 A、B、C、D 均没有 tag,则投递时不用匹配消息, A、B、C、D 都能收到这100条消息。
场景3
Topic 有且仅有1个订阅者 A,A 的订阅标签设置为 xiaomi。
此时,有生产者 publish 到 Topic 100条消息,带的消息标签过滤为:apple、imac、iphone、macbook。此后 Topic 立即投递到 A。
场景分析:
对于订阅者 A,订阅标签不匹配,则 A 无法收到这100条消息。此时这100条消息立即丢弃,不会堆积在 CMQ 里。
生产者 publish 到 Topic 时,消息过滤 tag 只能在 publish 前设置一次,跟消息 ID 绑定,不可修改。
场景4
Topic 名为 test1 ,在12点01分,调用了订阅发布的 API,定义该次操作为“发布 test1”,发布后 Topic 投递到订阅者,给 A、B、C 三个订阅者投递了200条消息。
假设结果为:A 有100条消息接收失败,B 有30条消息接收失败,C 的200条消息全部接收成功
场景分析:
消息堆积:在 A、B、C 三个订阅者中,假定消息投递失败的总集合为110条(A 有100条消息失败,B 有30条失败,C全部成功,三者之间的失败消息有交错的部分),只要存在任意1个订阅者与某条消息存在关联关系,该消息都不会被立即退订。
阻塞策略:以 A 为案例,Topic 投递给 A 200条消息,第101条失败时候,则后面的99条的投递会被阻塞。状态为有100条消息接收失败。
重试策略(退避重试):Topic-test1 会每隔 N 秒给 A 重新投递消息,失败的100条里,按顺序,从第一条重新尝试,若失败三次,直接就丢弃,然后投递下一条消息,失败三次后再次丢弃,按顺序排列。
重试策略(衰退指数重试):以 A 为案例,Topic 会并发投递100条消息(无法保证先后顺序)当订阅者接收的第一条消息失败后,从第一条重新尝试,若失败,继续阻塞。后续新增的投递消息,会继续阻塞。
消息生命周期不可延长:假定这110条堆积在 Topic-test1 里的消息,中间无论被重试多少次,生命周期都为1天,从生产者推送到 Topic 的时间点,作为起止时间点,到期后删除
重新推送:生产者不断往 Topic 中生产新的消息,客户在12点02分,又调用了订阅发布的 API,加上之前失败堆积的110条消息,假设目前 Topic 中共有110(失败堆积)+100(1分钟时间内新增的消息)=210条消息,再次投递。这时,A、B 的重试策略为衰退指数重试,处于“失联”状态,则210条消息会继续堆积。 对于 C,只接收新增的100条消息。
每一条消息的 ID,作为 key,value 是关联的订阅者、代表每个订阅者消费成功与否。
场景5
Topic 名为 test2 ,在12点01分,调用了订阅发布的 API,定义该次操作为“发布 test2”,给 A、B、C 三个订阅者投递了200条消息。
假设结果为:A、B、C 的200条消息全部接收成功。
场景分析:
若 Topic-test2,有且仅有 A、B、C 三个订阅者,且都确定消费成功后,Topic 会立即删除200条消息。
规律总结
通过以上不同场景,标签匹配规律如下:
|
| | | |
| | | |
| | | 两者匹配的,才能收到消息。支持 N:M 匹配,如消息有10个 tag,订阅者有4个 tag,其中有1个 tag 相互能匹配上,则订阅者能收到消息。 |
| | | |
本页内容是否解决了您的问题?