tencent cloud

文档反馈

主题模型快速入门

最后更新时间:2020-06-10 15:50:02

    主题发布消息有一个前提,即需要有订阅者订阅主题,如果没有订阅者存在,那么主题中的消息不会被投递,此时发布消息这一操作就失去了意义。

    1. 创建主题

        endpoint='' //CMQ 的域名
        secretId ='' // 用户的 ID 和 key
        secretKey = ''
        account = Account(endpoint,secretId,secretKey)
        topicName = 'TopicTest8B'
        my_topic = account.get_topic(topicName)
        topic_meta = TopicMeta()
        my_topic.create(topic_meta)

    您可以从控制台查看已创建的主题。其中 QPS = 5000,表示调用同一个接口的频率上限,默认为5000次/s。如果您需要提高 QPS 上限,可以通过 提交工单 反馈给我们。

    2. 发布消息

    您可以通过 SDK 或控制台两种方式发布消息。

    使用 SDK 发送消息

        message = Message()
        message.msgBody = "this is a test message"
        my_topic.publish_message(message)

    使用控制台发布消息

    Topic 目前支持消息过滤,即消息标签、消息类型,用来区分某个 CMQ 的 Topic 下的消息分类。MQ 允许消费者按照标签对消息进行过滤,确保消费者最终只消费到他关心的消息类型。该功能默认不开启,未开启时,所有消息向所有订阅者发送;增加标签后,订阅者将仅能收到带该标签的信息。消息过滤标签描述了该订阅中消息过滤的标签(标签一致的消息才会被推送)。单个标签不超过16个字符,单个 Message 可最多添加5个标签。
    Topic 目前支持标签过滤和 routingKey 过滤两种方式。

    批量发布消息

        vmsg = []
        for i in range(6):
        message = Message()
        message.msgBody = "this is a test message"
        vmsg.append(message)
    
        my_topic.batch_publish_message(vmsg)

    3. 消息处理

    主题发布消息之后,会自动将消息推送给订阅,当推送失败时,有两种重试策略:

    • 退避重试:重试3次,间隔时间为10 - 20s之间的一个随机值,超过3次后,该条消息对于该订阅者丢弃,不会再重试。
    • 衰退指数重试:重试176次,总计重试时间为1天,间隔时间依次为:2^0 , 2^1, …, 512, 512, …, 512秒。默认为衰退指数重试策略。

    使用队列处理消息

    订阅者可以填写一个 Queue,使用队列来接收发布的消息。

        subscription_name = "subsc-test"
        my_sub = my_account.get_subscription(topic_name, subscription_name)
        subscription_meta = SubscriptionMeta()
        # 填写订阅名称,这里填写队列名称
        subscription_meta.Endpoint = "queue name  "
        subscription_meta.Protocal = "queue"
        my_sub.create(subscription_meta)

    使用其他方式处理消息

    订阅者也可以不与 Queue 结合,自己来处理消息。详情请参考 投递消息

    4. 路由匹配

    Binding key 、Routing key 是组合使用的,兼容 rabbitmq topic 匹配模式。发消息时配的 Routing key 是客户端发消息带的, 必须为字符串,不能有匹配符。创建订阅关系时配的 Binding key 是 topic 和订阅者的绑定关系。

    使用限制:

    • Binding key 的数量不超过5个。单个 binding key 的长度 ≤ 64字节,用于表示发送消息的路由路径,最多含有15个“.”,即最多16个词组。
    • Routing key 的数量由1个字符串组成。单个 Routing key 的长度 ≤ 64字节,用于表示发送消息的路由路径,最多含有15个“.”,即最多16个词组。

    通配符说明:

    • *(星号):可以替代一个单词(一串连续的字母串)。
    • #(井号):可以匹配一个或多个字符。
    • rabbitmq的特例:routing_key为空字符串时,不匹配 * ,但可以匹配 #。

    示例:

    • 订阅者是『1.*.0』,此时消息为『1.任意字符.0』,则订阅者都能收到消息。
    • 订阅者是『1.#.0』,此时消息为『1.2.3.4.4.2.2.0』,则订阅者都能收到消息(消息中间元素随意) 。

    使用路由匹配功能

        endpoint='' //CMQ 的域名
        secretId ='' // 用户的 ID 和 key
        secretKey = ''
        account = Account(endpoint,secretId,secretKey)
        topicName = 'TopicTest'
        my_topic = account.get_topic(topicName)
        topic_meta = TopicMeta()
        topic_meta.filterType = 2 //表示消息投递给订阅的时候采用路由匹配
        //若filterType =1 则表示使用标签过滤
        my_topic.create(topic_meta)
    
        subscription_name = "subsc-test"
        my_sub = my_account.get_subscription(topic_name, subscription_name)
        subscription_meta = SubscriptionMeta()
        //填写订阅名称,这里填写队列名称
        subscription_meta.Endpoint = "queue name  "
        subscription_meta.Protocal = "queue"
        subscription_meta.bindingKey=[1.*.0]  //若消息的标签为[1.任意字符.0],则该订阅者都能收到该标签
    
        my_sub.create(subscription_meta)

    发布消息

        message = Message()
        message.msgBody = "this is a test message"
        message.routingKey = '1.test.0' //该消息会被投递到 my_sub订阅的地址
        my_topic.publish_message(message)
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持