tencent cloud

文档反馈

数据消费操作指导

最后更新时间:2024-07-08 18:59:27

    操作场景

    数据同步到 Kafka 后,您可以通过0.11版本及以上的 Kafka 客户端 进行消费订阅数据,本文为您提供了 Java、Go、Python 语言的客户端消费 Demo 示例,方便您快速测试消费数据的流程,了解数据格式解析的方法。

    注意事项

    1. Demo 并不包含消费数据的用法演示,仅对数据做了打印处理,您需要在此基础上自行编写数据处理逻辑,您也可以使用其他语言的 Kafka 客户端消费并解析数据。
    2. 写入到 Kafka 中的数据支持兼容开源工具 Canal 的格式,采用 Protobuf 或者 JSON 的序列化协议,您可以在配置同步任务的过程中选择数据格式 Canal ProtoBuf 或者 Canal JSON。
    3. 目标 Ckafka 中消息大小设置的上限需要大于源库表中单行数据的最大值,以便数据可以正常同步到目标端。
    4. 为了保证数据可重入,DTS 引入 CHECKPOINT 机制,在事务之间插入 CHECKPOINT 消息用来标识数据同步的位点,任务中断重启后可实现断点续传。同时,消费端遇到 CHECKPOINT 消息会做一次 Kafka 消费位点提交,以便及时更新消费位点。
    如果用户对 CHECKPOINT 比较在意,不希望在消费的消息中出现 CHECKPOINT 消息,可以在配置任务过程中,设置过滤 CHECKPOINT 消息。过滤后 DTS 仍可以实现任务重启后的断点续传,仅在写入目标端 Kafka 时不插入 CHECKPOINT 消息。
    5. 源库中 Timestamp 类型的字段同步到目标端会转换为0时区(如"2021-05-17 07:22:42 +00:00"),在消费数据时请根据您的场景评估,是否要对该字段进行格式转换。

    消费 Demo 下载

    在配置同步任务中,您可以选择不同的数据格式,具体如下。配置任务时选择了哪种格式,这里的 Demo 下载也需要下载对应格式的 Demo。
    Avro:二进制格式,消费效率更高。
    JSON:为轻量级的文本格式,更加简单易用。
    Canal ProtBuf:适配 Canal 的 ProtoBuf 数据格式。
    Canal JSON:适配 Canal 的 JSON 数据格式。
    如下提供的 Demo 示例,均已包含对应的 Avro/JSON/ProtoBuf 协议文件,无需另外下载。Demo 中的逻辑讲解及关键参数说明,请参考 Demo 说明
    Demo 语言
    Avro 格式
    JSON 格式
    Canal ProtoBuf/Canal JSON 格式
    Go
    地址
    地址
    地址
    Java
    地址
    地址
    地址
    Python
    地址
    地址
    地址

    Java Demo 使用说明

    编译环境:Maven 或者 Gradle 包管理工具,JDK8。用户可自行选择打包工具,如下以 Maven 为例进行介绍。 操作步骤:
    1. 下载 Java Demo,然后解压该文件。
    2. 进入解压后的目录,为方便使用,目录下分别放置了 Maven 模型文件、pom.xml 文件,用户根据需要选用。 使用 Maven 进行打包:mvn clean package。
    3. 运行 Demo。 使用 Maven 打包后,进入目标文件夹 target。
    数据格式选择 Avro 格式,运行如下代码:
    java -jar consumerDemo.jar --brokers xxx --topic xxx --group xxx --trans2sql
    数据格式选择 JSON 格式,运行如下代码:
    java -jar consumerDemo.jar --brokers xxx --topic xxx --group xxx --trans2sql
    数据格式选择 Canal ProtoBuf 格式,运行如下代码:
    java -jar consumerDemo.jar --brokers xxx --topic xxx --group xxx
    数据格式选择 Canal JSON 格式,运行如下代码:
    java -jar consumerDemo.jar --brokers xxx --topic xxx --group xxx --flatMessage
    各参数详细说明如下:
    参数
    说明
    broker
    CKafka 的访问地址。
    topic
    同步任务中设置的 topic 名称。
    group
    消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
    trans2sql
    表示是否转换为 SQL 语句,携带该参数表示转换为 SQL 语句,不携带则不转换。
    选择 Avro、JSON 格式时需要配置。
    flatMessage
    仅选择 Canal JSON 数据格式时需要携带。
    4. 观察消费情况。
    
    

    Golang Demo 使用说明

    编译环境:Golang 1.12 及以上版本,配置好 Go Module 环境。 操作步骤:
    1. 下载 Golang Demo,然后解压该文件。
    2. 进入解压后的目录,运行 go build -o consumer ./main/main.go,生成可执行文件 consumer。
    3. 运行命令。
    数据格式选择 Avro 格式,运行如下代码:
    ./consumer --brokers=xxx --topic=xxx --group=xxx --trans2sql=true
    数据格式选择 JSON 格式,运行如下代码:
    ./consumer --brokers=xxx --topic=xxx --group=xxx --trans2sql=true
    数据格式选择 Canal ProtoBuf 格式,运行如下代码:
    ./consumer --brokers=xxx --topic=xxx --group=xxx
    数据格式选择 Canal JSON 格式,运行如下代码:
    ./consumer --brokers=xxx --topic=xxx --group=xxx --flatMessage=true
    各参数详细说明如下:
    参数
    说明
    broker
    CKafka 的访问地址。
    topic
    同步任务中设置的 topic 名称。
    group
    消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
    trans2sql
    表示是否转换为 SQL 语句。选择 Avro、JSON 格式时需要配置。
    flatMessage
    仅选择 Canal JSON 数据格式时需要携带。
    4. 观察消费情况。

    Python3 Demo 使用说明

    编译运行环境:安装 Python3,pip3(用于依赖包安装)。 使用 pip3 安装依赖包:
    pip install flag
    pip install kafka-python
    pip install avro
    操作步骤:
    1. 下载 Python3 Demo ,然后解压该文件。
    2. 运行main.py
    数据格式选择 Avro 格式,运行如下代码:
    python main.py --brokers=xxx --topic=xxx --group=xxx --trans2sql=1
    数据格式选择 JSON 格式,运行如下代码:
    python main.py --brokers=xxx --topic=xxx --group=xxx --trans2sql=1
    数据格式选择 Canal ProtoBuf 格式,运行如下代码:
    python main.py --brokers=xxx --topic=xxx --group=xxx
    数据格式选择 Canal JSON 格式,运行如下代码:
    python main.py --brokers=xxx --topic=xxx --group=xxx --flatMessage=1
    各参数详细说明如下:
    参数
    说明
    broker
    CKafka 的访问地址。
    topic
    同步任务中设置的 topic 名称。
    group
    消费组名称,用户可提前在 Ckafka 中创建消费组,也可以在此处自定义输入。
    trans2sql
    表示是否转换为 SQL 语句,设置为1表示转换为 SQL 语句,设置为0则不转换。选择 Avro、JSON 格式时需要配置。
    flatMessage
    仅选择 Canal JSON 数据格式时需要携带。
    3. 观察消费情况。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持