tencent cloud

文档反馈

Python SDK

最后更新时间:2024-01-03 10:20:36

    操作步骤

    本文以 Python SDK 为例介绍客户端接入 TDMQ CMQ 版服务并收发消息的操作步骤。

    前提条件

    队列模型

    操作步骤

    1. 在控制台创建符合需求的队列,参见 创建队列服务
    说明:
    创建消息队列可在控制台手动创建,或通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装。
    shell
    python
    pip install --upgrade tencentcloud-sdk-python
    
    # api认证信息
    cred = credential.Credential(SecretId, SecretKey)
    httpProfile = HttpProfile()
    httpProfile.endpoint = NameServerAddress
    
    clientProfile = ClientProfile()
    clientProfile.httpProfile = httpProfile
    # 创建tdmq客户端
    client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)
    
    # 创建cmq队列请求参数
    req = models.CreateCmqQueueRequest()
    params = {
    "QueueName": "queue_api",
    # 下面是死信队列相关配置
    "DeadLetterQueueName": "dead_queue_api", # 死信队列,该消息队列要先创建
    "Policy": 0, # 0为消息被多次消费未删除,1为Time-To-Live过期
    "MaxReceiveCount": 3 # 最大接收次数 1-1000
    }
    req.from_json_string(json.dumps(params))
    
    # 创建cmq消息队列
    resp = client.CreateCmqQueue(req)
    
    参数
    说明
    NameServerAddress
    API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API请求地址处复制。
    
    
    
    SecretId、SecretKey
    云 API 密钥,登录 访问管理控制台,在访问密钥 > API密钥管理页面复制。
    
    
    
    2. 在项目中引入 CMQ 相关文件,需要根据使用的 Python 版本选择分支,默认为 Python2 SDK,您可切换至 Python3 分支中查看 Python3 SDK。
    3. 发送消息。
    import os
    import sys
    
    sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")
    
    import logging
    from cmq.account import Account
    from cmq.queue import Message
    from cmq.cmq_exception import CMQExceptionBase
    
    # 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
    # 密钥可前往https://console.tencentcloud.com/cam/capi网站进行获取
    secretId = 'AKIDSiiRtxxxx'
    secretKey = 'GGzSeaM5xxxx'
    # CMQ的服务调用地址
    nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'
    
    # 初始化 my_account, my_queue
    # Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
    my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
    my_account.set_log_level(logging.DEBUG)
    # 消息队列名称
    queue_name = sys.argv[1] if len(sys.argv) > 1 else "python_queue"
    my_queue = my_account.get_queue(queue_name)
    
    try:
    # 消息内容
    msg_body = "I am test message."
    msg = Message(msg_body)
    # 发送消息
    re_msg = my_queue.send_message(msg)
    # 发送结果
    print("Send Message Succeed! MessageBody:%s MessageID:%s" % (msg_body, re_msg.msgId))
    except CMQExceptionBase as e:
    print("Send Message Fail! Exception:%s\\n" % e)
    
    参数
    说明
    NameServerAddress
    API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API请求地址处复制。
    
    
    
    SecretId、SecretKey
    云 API 密钥,登录 访问管理控制台,在访问密钥 > API密钥管理页面复制。
    
    
    
    queue_name
    队列名称,在 TDMQ CMQ 版控制台 的队列服务列表页面获取。
    4. 消费消息。
    import os
    import sys
    
    sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")
    
    import logging
    from cmq.account import Account
    from cmq.cmq_exception import CMQExceptionBase
    
    # 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
    # 密钥可前往https://console.tencentcloud.com/cam/capi网站进行获取
    secretId = 'AKIDSiiRtxxxx'
    secretKey = 'GGzSeaM5xxxx'
    # CMQ的服务调用地址
    nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'
    
    # 初始化 my_account, my_queue
    # Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
    my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
    my_account.set_log_level(logging.DEBUG)
    queue_name = sys.argv[1] if len(sys.argv) > 1 else "python_queue"
    my_queue = my_account.get_queue(queue_name)
    
    try:
    wait_seconds = 3
    # 获取消息
    recv_msg = my_queue.receive_message(wait_seconds)
    # 具体业务
    print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
    recv_msg.receiptHandle, recv_msg.msgBody, recv_msg.msgId))
    # 消费成功,删除消息
    my_queue.delete_message(recv_msg.receiptHandle)
    except CMQExceptionBase as e:
    print("Receive Message Fail! Exception:%s\\n" % e)
    
    参数
    说明
    NameServerAddress
    API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。
    
    
    
    SecretId、SecretKey
    云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。
    
    
    
    queue
    队列名称,在 TDMQ CMQ 版控制台 的队列服务列表页面获取。

    主题模型

    操作步骤

    1. 准备所需资源,创建主题订阅和订阅者。
    1.1 创建主题订阅。可通过控制台手动创建,也可以通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装。
    # api认证信息
    cred = credential.Credential(SecretId, SecretKey)
    httpProfile = HttpProfile()
    httpProfile.endpoint = NameServerAddress
    
    clientProfile = ClientProfile()
    clientProfile.httpProfile = httpProfile
    client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)
    
    req = models.CreateCmqTopicRequest()
    params = {
    "TopicName": "topic_api", # 主题名字,在单个地域同一帐号下唯一
    "FilterType": 1, # 用于指定主题的消息匹配策略。1:表示标签匹配策略;2:表示路由匹配策略
    "MsgRetentionSeconds": 86400 # 消息保存时间。取值范围60 - 86400 s(即1分钟 - 1天)
    }
    req.from_json_string(json.dumps(params))
    
    # 创建topic
    resp = client.CreateCmqTopic(req)
    
    参数
    说明
    NameServerAddress
    API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。
    
    
    
    SecretId、SecretKey
    云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。
    
    
    
    1.2 创建订阅者。可通过控制台进行手动创建,也可以通过云 API 进行创建,使用云 API 需要安装相关 SDK,SDK 安装可参见 Python SDK 安装。
    # api认证信息
    cred = credential.Credential(SecretId, SecretKey)
    httpProfile = HttpProfile()
    httpProfile.endpoint = NameServerAddress
    
    clientProfile = ClientProfile()
    clientProfile.httpProfile = httpProfile
    client = tdmq_client.TdmqClient(cred, "ap-guangzhou", clientProfile)
    
    req = models.CreateCmqSubscribeRequest()
    params = {
    "TopicName": "topic_api", # 创建订阅的topic名称
    "SubscriptionName": "sub", # 订阅名称
    "Protocol": "queue", # 订阅的协议,目前支持两种协议:http、queue。使用http协议,用户需自己搭建接受消息的web server。使用queue,消息会自动推送到CMQ queue,用户可以并发地拉取消息。
    "Endpoint": "topic_queue_api", # 接收通知的Endpoint,根据协议Protocol区分:对于http,Endpoint必须以“http://”开头,host可以是域名或IP;对于Queue,则填QueueName。
    "NotifyStrategy": "BACKOFF_RETRY", # CMQ推送服务器的重试策略。取值有:1)BACKOFF_RETRY,退避重试。;2)EXPONENTIAL_DECAY_RETRY,指数衰退重试。
    "FilterTag": ["TAG"], # 消息标签(用于消息过滤)。标签数量不能超过5个
    # "BindingKey": ["a.b.c"], # BindingKey数量不超过5个, 每个BindingKey长度不超过64字节,该字段表示订阅接收消息的过滤策略
    "NotifyContentFormat": "SIMPLIFIED" # 推送内容的格式。取值:1)JSON;2)SIMPLIFIED,即raw格式。如果Protocol是queue,则取值必须为SIMPLIFIED。如果Protocol是http,两个值均可以,默认值是JSON。
    }
    req.from_json_string(json.dumps(params))
    
    # 创建订阅
    resp = client.CreateCmqSubscribe(req)
    
    注意:
    BindingKey 与 FilterTag 要根据所订阅topic类型进行设置,否则无效。
    参数
    说明
    NameServerAddress
    API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。
    
    
    
    SecretId、SecretKey
    云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。
    
    
    
    2. 在项目中引入 CMQ 相关文件,需要根据使用的 Python 版本选择分支,默认为 Python2 SDK,您可切换至 Python3 分支中查看 Python3 SDK。
    3. 创建 my_topic,用来发布消息。
    import os
    import sys
    
    sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)) + "/..")
    
    import logging
    from cmq.account import Account
    from cmq.cmq_exception import *
    from cmq.topic import *
    
    # 腾讯云账户 secretId、secretKey, 此处还需注意密钥对的保密
    # 密钥可前往https://console.tencentcloud.com/cam/capi网站进行获取
    secretId = 'AKIDSiiRtxxxx'
    secretKey = 'GGzSeaM5xxxx'
    # CMQ的服务调用地址
    nameServerAddress = 'https://cmq-gz.public.tencenttdmq.com'
    
    try:
    # 初始化 my_account
    # Account类对象不是线程安全的,如果多线程使用,需要每个线程单独初始化Account类对象
    my_account = Account(nameServerAddress, secretId, secretKey, debug=True)
    my_account.set_log_level(logging.DEBUG)
    # topic主题名称
    topic_name = sys.argv[1] if len(sys.argv) > 1 else "python_topic_route"
    my_topic = my_account.get_topic(topic_name)
    except CMQExceptionBase as e:
    print("Exception:%s\\n" % e)
    
    参数
    说明
    NameServerAddress
    API 调用地址,在 TDMQ CMQ 版控制台 的队列服务 > API 请求地址处复制。
    
    
    
    SecretId、SecretKey
    云 API 密钥,登录 访问管理控制台,在访问密钥 > API 密钥管理页面复制。
    
    
    
    topic_name
    主题订阅名称,在 TDMQ CMQ 版控制台 的主题订阅列表页面获取。
    4. 发送 TAG 类型消息。
    # 消息tag
    tags = ["TAG", "TAG1", "TAG2"]
    for tag in tags:
    # 发送tag消息
    message = Message("this is a test TAG message. TAG:" + tag, [tag])
    re_msg = my_topic.publish_message(message)
    # 发送结果
    print("Send Message Succeed! MessageBody:%s MessageID:%s" % (message.msgBody, re_msg.msgId))
    
    5. 发送 route 消息。
    # 消息route信息
    routes = ["a.b.c", "a.b.x", "a.c.d", "x.y.z", "x.y.c"]
    for route in routes:
    message = Message("this is a test route message. Route:" + route)
    # 发送route消息
    re_msg = my_topic.publish_message(message, route)
    # 发送结果
    print("Send Message Succeed! MessageBody:%s MessageID:%s" % (message.msgBody, re_msg.msgId))
    
    
    6. 消费者消费订阅者订阅的消息队列即可。
    说明:
    以上是 CMQ 两种模型下的生产和消费方式的简单介绍,更多使用可参见 DemoCMQ 代码仓库
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持