tencent cloud

文档反馈

HTTP 协议接入

最后更新时间:2023-05-16 11:07:52

    操作场景

    TDMQ RocketMQ 版支持用户通过内网或公网使用 HTTP 协议接入,并兼容社区的多语言 HTTP SDK
    本文以调用 Java SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
    注意:
    暂不支持通过使用 HTTP 协议实现事务消息和顺序消息。
    在创建 group(消费组)时需要制定类型(TCP 或者 HTTP,详情请参见 创建 Group 说明 ),因此,同一个 group(消费组)不支持 TCP 和 HTTP 客户端同时消费。

    前提条件

    通过 Maven方式引入依赖,在 pom.xml 文件中添加对应语言的 SDK 依赖
    更多示例可以参考开源社区的 Demo 示例

    重试机制

    通过 HTTP 消费的每条消息,会有一个 5 分钟的不可见时间。
    若客户端在不可见时间内 ACK 这条消息,则表示消费成功,不会重试。
    若不可见时间到期后客户端仍未 ACK,则该条消息会重新可见,即接下来客户端会重新消费到这条消息。
    需要注意,一次消费的不可见时间到期后消息句柄会失效,无法再对其进行 ACK。

    操作步骤

    步骤1:引入依赖

    在工程的 pom.xml 文件中引入对应语言的 SDK 依赖。

    步骤2:获取参数

    1. 登录 TDMQ 控制台,选择所在的集群,点击集群名进入集群详情页。
    2. 如下图所示,选择顶部的 “命名空间” 页签,单击右侧的 配置权限 进入权限配置页面,如当前角色列表为空,可以单击 新建,新建一个角色,详细描述请参见 完成资源创建与准备
    
    
    
    3. 在页面上复制对应的 AK 和 SK,以备在接下来的步骤中使用。
    
    
    

    步骤3:生产者客户端初始化

    JAVA
    PHP
    NodeJS
    import com.aliyun.mq.http.MQClient;
    import com.aliyun.mq.http.MQProducer;
    
    public class Producer {
    
    public static void main(String[] args) {
    MQClient mqClient = new MQClient(
    // HTTP接入点
    "${HTTP_ENDPOINT}",
    // TDMQ RocketmQ access key, 从腾讯云控制台创建获取
    "${ACCESS_KEY}",
    // TDMQ RocketmQ 的角色名, 从腾讯云控制台创建获取
    "${SECRET_KEY}"
    );
    
    // 消息发送的Topic, 从腾讯云TDMQ控制台创建获取, 必传参数
    final String topic = "${TOPIC}";
    // Topic所属的命名空间, 从腾讯云TDMQ控制台创建获取, 必传参数
    final String instanceId = "${INSTANCE_ID}";
    
    // 创建生产者
    MQProducer producer = mqClient.getProducer(instanceId, topic);
    
    // 发送消息
    
    mqClient.close();
    }
    }
    require "vendor/autoload.php";
    
    use MQ\\MQClient;
    
    class ProducerTest
    {
    private $client;
    private $producer;
    
    public function __construct()
    {
    $this->client = new MQClient(
    // HTTP接入点
    "${HTTP_ENDPOINT}",
    // TDMQ RocketmQ access key, 从腾讯云控制台创建获取
    "${ACCESS_KEY}",
    // TDMQ RocketmQ 的角色名, 从腾讯云控制台创建获取
    "${SECRET_KEY}"
    );
    
    // 消息发送的Topic, 从腾讯云TDMQ控制台创建获取, 必传参数
    $topic = "${TOPIC}";
    // Topic所属的命名空间, 从腾讯云TDMQ控制台创建获取, 必传参数
    $instanceId = "${INSTANCE_ID}";
    
    $this->producer = $this->client->getProducer($instanceId, $topic);
    }
    
    public function run()
    {
    // 发送消息
    }
    }
    
    
    $instance = new ProducerTest();
    $instance->run();
    const {
    MQClient,
    MessageProperties
    } = require('@aliyunmq/mq-http-sdk');
    
    // 设置HTTP接入域名
    const endpoint = "{Endpoint}";
    // AccessKey
    const accessKeyId = "{Accesskey}";
    // SecretKey
    const accessKeySecret = "rop";
    
    var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
    
    // 所属的 Topic
    const topic = "TopicA";
    // Topic所属实例ID
    const instanceId = "MQ_INST_xxxxx";
    
    const producer = client.getProducer(instanceId, topic);
    
    (async function(){
    try {
    // 循环发送4条消息
    for(var i = 0; i < 4; i++) {
    let res;
    if (i % 2 == 0) {
    msgProps = new MessageProperties();
    // 设置属性
    msgProps.putProperty("key", i);
    // 设置KEY
    msgProps.messageKey("MessageKey");
    res = await producer.publishMessage("hello mq.", "", msgProps);
    } else {
    msgProps = new MessageProperties();
    // 设置属性
    msgProps.putProperty("key", i);
    // 定时消息, 定时时间为10s后
    msgProps.startDeliverTime(Date.now() + 10 * 1000);
    res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
    }
    console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
    }
    
    } catch(e) {
    // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
    console.log(e)
    }
    })();

    步骤4:消费者客户端初始化

    JAVA
    PHP
    NodeJS
    import com.aliyun.mq.http.MQClient;
    import com.aliyun.mq.http.MQConsumer;
    
    public class Consumer {
    
    public static void main(String[] args) {
    MQClient mqClient = new MQClient(
    // HTTP接入点
    "${HTTP_ENDPOINT}",
    // TDMQ RocketmQ access key, 从腾讯云控制台创建获取
    "${ACCESS_KEY}",
    // TDMQ RocketmQ 的角色名, 从腾讯云控制台创建获取
    "${SECRET_KEY}"
    );
    
    // 消费的Topic, 从腾讯云TDMQ控制台创建获取, 必传参数
    final String topic = "${TOPIC}";
    // 消费组名称, 从腾讯云TDMQ控制台创建获取, 必传参数
    final String groupId = "${GROUP_ID}";
    // Topic所属的命名空间, 从腾讯云TDMQ控制台创建获取, 必传参数
    final String instanceId = "${INSTANCE_ID}";
    
    final MQConsumer consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
    
    do {
    // 消费消息
    } while (true);
    }
    }
    require "vendor/autoload.php";
    
    use MQ\\MQClient;
    
    class ConsumerTest
    {
    private $client;
    private $consumer;
    
    public function __construct()
    {
    $this->client = new MQClient(
    // HTTP接入点
    "${HTTP_ENDPOINT}",
    // TDMQ RocketmQ access key, 从腾讯云控制台创建获取
    "${ACCESS_KEY}",
    // TDMQ RocketmQ 的角色名, 从腾讯云控制台创建获取
    "${SECRET_KEY}"
    );
    
    // 消费的Topic, 从腾讯云TDMQ控制台创建获取, 必传参数
    $topic = "${TOPIC}";
    // 消费组名称, 从腾讯云TDMQ控制台创建获取, 必传参数
    $groupId = "${GROUP_ID}";
    // Topic所属的命名空间, 从腾讯云TDMQ控制台创建获取, 必传参数
    $instanceId = "${INSTANCE_ID}";
    
    $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }
    
    public function run()
    {
    while (True) {
    // 消费消息
    }
    }
    }
    
    
    $instance = new ConsumerTest();
    $instance->run();
    const {
    MQClient
    } = require('@aliyunmq/mq-http-sdk');
    
    // 设置HTTP接入域名
    const endpoint = "{Endpoint}";
    // AccessKey
    const accessKeyId = "{Accesskey}";
    // SecretKey
    const accessKeySecret = "rop";
    
    var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
    
    // 所属的 Topic
    const topic = "TopicA";
    // Topic所属实例ID
    const instanceId = "MQ_INST_xxxxx";
    // 您在控制台创建的 Consumer Group
    const groupId = "GID_xxx";
    
    const consumer = client.getConsumer(instanceId, topic, groupId);
    
    (async function(){
    // 循环消费消息
    while(true) {
    try {
    // 长轮询消费消息
    // 长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
    res = await consumer.consumeMessage(
    3, // 一次最多消费3条(最多可设置为16条)
    3 // 长轮询时间3秒(最多可设置为30秒)
    );
    
    if (res.code == 200) {
    // 消费消息,处理业务逻辑
    console.log("Consume Messages, requestId:%s", res.requestId);
    const handles = res.body.map((message) => {
    console.log("\\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
    ",Props:%j,MessageKey:%s,Prop-A:%s",
    message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
    message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
    return message.ReceiptHandle;
    });
    
    // message.NextConsumeTime前若不确认消息消费成功,则消息会重复消费
    // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样
    res = await consumer.ackMessage(handles);
    if (res.code != 204) {
    // 某些消息的句柄可能超时了会导致确认不成功
    console.log("Ack Message Fail:");
    const failHandles = res.body.map((error)=>{
    console.log("\\tErrorHandle:%s, Code:%s, Reason:%s\\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
    return error.ReceiptHandle;
    });
    handles.forEach((handle)=>{
    if (failHandles.indexOf(handle) < 0) {
    console.log("\\tSucHandle:%s\\n", handle);
    }
    });
    } else {
    // 消息确认消费成功
    console.log("Ack Message suc, RequestId:%s\\n\\t", res.requestId, handles.join(','));
    }
    }
    } catch(e) {
    if (e.Code.indexOf("MessageNotExist") > -1) {
    // 没有消息,则继续长轮询服务器
    console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
    } else {
    console.log(e);
    }
    }
    }
    })();

    

    
    
    
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持