tencent cloud

文档反馈

HTTP 协议接入 Kafka

最后更新时间:2024-01-09 14:56:36

    操作场景

    与任何客户端-服务器应用程序一样,Kafka 通过一组明确定义的 API 提供对其功能的访问,这些 API 通过 Kafka 协议公开,是一种仅限于 Kafka 的 TCP 二进制协议。与 Kafka API 交互的最佳方式是客户端通过使用 Kafka 协议,Apache Kafka 项目仅正式支持 Java 的客户端库,但除此之外,Confluent 还正式支持 C/C++,C#,Go 和 Python 的客户端库。
    一些编程语言缺乏官方支持的 Kafka 生产级客户端,而 HTTP 是一种广泛可用、普遍支持的协议,DataHub 数据接入通过 HTTP 协议公开消息发送 API,以便于简化客户端复杂的配置。
    本文介绍 DataHub 服务的 HTTP 数据接入功能中的发送消息结合实际应用场景提供建议。

    技术架构

    HTTP 数据接入层开启后,公网的 HTTP 客户端可通过云 API 直接向 CKafka 所在的实例发送消息。示意图如下:
    
    
    

    前提条件

    已创建好目标 CKafka 实例和 Topic。

    操作步骤

    创建数据接入任务

    使用 SDK 发送消息

    1. 在 Java 项目通过 Maven、Gradle 等方式引入数据上报 SDK。以下是配置项目的 pom.xml 文件。
    <dependency>
    <groupId>com.tencentcloudapi</groupId>
    <artifactId>tencentcloud-sdk-java</artifactId>
    <version>3.1.430</version>
    </dependency>
    
    2. 单击 数据接入 的任务详情, 复制接入点信息到 SDK 中使用,用于写入数据。
    3. 示例中通过 generateMsgFromUserAccess 将所有要发送的消息组装起来,复制接入点信息。
    List<BatchContent> batchContentList = generateMsgFromUserAccess(userId);
    // 其中 ap-xxx 为对应的云API地域简称
    CkafkaClient client = new CkafkaClient(
    new Credential("yourSecretId", "yourSecretKey"), "ap-xxx");
    
    
    SendMessageRequest messageRequest = new SendMessageRequest();
    // 数据接入任务接入点ID
    messageRequest.setDataHubId("datahub-lzxxxxx6");
    messageRequest.setMessage(batchContentList.toArray(BatchContent[]::new));
    
    try {
    SendMessageResponse sendMessageResponse = client.SendMessage(messageRequest);
    String[] messageId = sendMessageResponse.getMessageId();
    for (String s : messageId) {
    LOGGER.info(s)
    }
    } catch (TencentCloudSDKException e) {
    LOGGER.error(e.getMessage());
    }
    
    4. 通过 HTTP 接入层发送消息的返回值示例如下。
    {
    "Response": {
    "MessageId": [
    "datahub-lxxxxxx6:topicDev:4:2:1648185961342:1648185961398"
    ],
    "RequestId": "3fq3na5r-xxxx-xxxx-xxxx-b2fiv0se7ded"
    }
    }
    
    5. 其中 MessageId 内容由一系列发送至 CKafka 实例后返回的元数据组成。如下分别为:
    "[datahubId]:[topic名称]:[所在的topic分区数]:[所在分区的offset]:[HTTP接入层收到消息的时间]:[消息发送至Kafka的时间]"
    

    查询消息

    通过 CKafka 控制台 查询 HTTP 接入层发送的消息,详细操作参见 消息查询。如下图,示例 topic 名称为 topicDev 的4号分区查询2号位点消息。
    
    
    

    任务暂停

    当您发现数据接入任务影响了正常业务时,可以暂停数据接入。
    1. 数据接入 页面,单击目标任务的操作栏的暂停,可暂停任务。
    2. 出现右上角的提示,则任务暂停成功。
    3. 此时通过 HTTP 接入层发送消息得到示例如下:
    {
    "Response": {
    "Error": {
    "Code": "FailedOperation",
    "Message": "task status suspended [datahub-lxxxxxx6]"
    },
    "RequestId": "5f737a5b-xxxx-xxxx-xxxxx-b2fb703e7ded"
    }
    }
    
    
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持