tencent cloud

文档反馈

数据库 TDSQL-MySQL

最后更新时间:2023-11-08 14:53:51

    介绍

    tdsql-subscribe connector 是针对腾讯云 TDSQL-MySQL 数据订阅的专有 connector,通过 数据订阅 功能接入 TDSQL-MySQL 的增量 binlog 数据,使用前请确保数据订阅任务已经配置成功。
    注意
    数据库 TDSQL Connector 目前处于 Beta 版本,如有需求请您 工单 联系我们。

    版本说明

    Flink 版本
    说明
    1.11
    不支持
    1.13
    支持
    1.14
    不支持
    1.16
    不支持

    使用范围

    tdsql-subscribe connector 支持用作数据源表(Source),不可以作为数据流的目的表(Sink)。

    DDL 定义

    当 tdsql-subscribe connector 作为 source 时,with 参数大部分与 Kafka connector 参数类似,连接参数都可以在订阅任务中找到。 值得注意的是,在使用 tdsql-subscribe connector 时,format 必须指定为 protobuf格式,因为数据订阅中,发送到 Kafka 的消息格式为 protobuf; 相较于通常使用的 Kafka connector,tdsql-subscribe connector 会多了一些认证信息,认证信息也是源于订阅任务。

    用作数据源(Source)

    protobuf 格式输入

    CREATE TABLE `DataInput` (
    `id` INT,
    `name` VARCHAR,
    `age` INT
    ) WITH (
    'connector' = 'tdsql-subscribe', -- 注意选择对应的内置 Connector
    'tdsql.database.name' = 'test_case_2022_06_0*', -- 对订阅消息进行过滤,消费数据库名满足 test_case_2022_06_0* 正则的订阅数据
    'tdsql.table.name' = 'test_0*', -- 对订阅消息进行过滤,消费数据表满足 test_0* 正则的订阅数据
    'topic' = 'topic-subs-5xop97nffk-tdsqlshard-xxx', -- 替换为订阅任务消费的 Topic
    'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种
    'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:3212', -- 替换为您的订阅任务 Kafka 连接地址
    'properties.group.id' = 'consumer-grp-subs-xxx-kk',
    'format' = 'protobuf', -- 只能是protobuf格式
    'properties.security.protocol'='SASL_PLAINTEXT', -- 认证协议
    'properties.sasl.mechanism'='SCRAM-SHA-512', -- 认证方式
    'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="account-subs-xxx-username" password="psw";' --用户名和密码
    );
    CREATE TABLE `jdbc_upsert_sink_table` (
    id INT PRIMARY KEY NOT ENFORCED,
    name STRING,
    age INT
    ) WITH (
    -- 指定数据库连接参数
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://172.28.28.138:3306/testdb', -- 请替换为您的实际 MySQL 连接参数
    'table-name' = 'sink', -- 需要写入的数据表
    'username' = 'user', -- 数据库访问的用户名(需要提供 INSERT 权限)
    'password' = 'psw' -- 数据库访问的密码
    );
    INSERT INTO jdbc_upsert_sink_table SELECT * FROM DataInput;

    WITH 参数

    参数值
    必填
    默认值
    描述
    connector
    固定值为 'tdsql-subscribe'
    topic
    要读的 Kafka Topic 名
    properties.bootstrap.servers
    逗号分隔的 Kafka Bootstrap 地址
    properties.group.id
    Kafka 消费时的 Group ID
    format
    Kafka 消息的输入格式。目前只支持 protobuf
    scan.startup.mode
    group-offsets
    Kafka consumer 的启动模式。
    可以是 latest-offsetearliest-offsetspecific-offsetsgroup-offsetstimestamp 的任何一种'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',使用 'specific-offsets' 启动模式时需要指定每个 partition 对应的 offsets。
    'scan.startup.timestamp-miles' = '1631588815000',使用 'timestamp' 启动模式时需要指定启动的时间戳(单位毫秒)
    scan.startup.specific-offsets
    如果 scan.startup.mode 的值为'specific-offsets',则必须使用本参数指定具体起始读取的偏移量。例如 'partition:0,offset:42;partition:1,offset:300'
    scan.startup.timestamp-millis
    如果scan.startup.mode 的值为'timestamp',则必须使用本参数来指定开始读取的时间点(毫秒为单位的 Unix 时间戳)
    tdsql.database.name
    tdsql 数据库名称,配置此参数,可以消费 tdsql 指定数据库的 binlog,前提是订阅任务包含了此数据库的 binlog。参数支持正则,例如 test_case_2022_06_0*
    tdsql.table.name
    tdsql 数据表名称,配置此参数,可以消费 tdsql 指定数据表的 binlog,前提是订阅任务包含了此数据表的 binlog。参数支持正则,例如 test_0*test_1,test_2
    说明
    如果需要配置 tdsql.database.nametdsql.table.name 参数,订阅任务建议配置 订阅全实例 ,如果有多个 Oceanus 任务消费不同 tdsql 数据库表时,多个 Oceanus 任务需要使用订阅任务的不同消费组,可在订阅任务中创建不同消费组。

    注意事项

    1. 如果订阅任务配置了多库多表或同库多表,需要保证表结构是相同的,才能正确接入订阅任务的数据。
    2. 源表中文编码目前仅支持 utf8gbk 两种。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持