Flink 版本 | 说明 |
1.11 | 不支持 |
1.13 | 支持 |
1.14 | 不支持 |
1.16 | 不支持 |
format
必须指定为 protobuf
格式,因为数据订阅中,发送到 Kafka 的消息格式为 protobuf
; 相较于通常使用的 Kafka connector,tdsql-subscribe connector 会多了一些认证信息,认证信息也是源于订阅任务。CREATE TABLE `DataInput` (`id` INT,`name` VARCHAR,`age` INT) WITH ('connector' = 'tdsql-subscribe', -- 注意选择对应的内置 Connector'tdsql.database.name' = 'test_case_2022_06_0*', -- 对订阅消息进行过滤,消费数据库名满足 test_case_2022_06_0* 正则的订阅数据'tdsql.table.name' = 'test_0*', -- 对订阅消息进行过滤,消费数据表满足 test_0* 正则的订阅数据'topic' = 'topic-subs-5xop97nffk-tdsqlshard-xxx', -- 替换为订阅任务消费的 Topic'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets 的任何一种'properties.bootstrap.servers' = 'guangzhou-kafka-2.cdb-dts.tencentcs.com.cn:3212', -- 替换为您的订阅任务 Kafka 连接地址'properties.group.id' = 'consumer-grp-subs-xxx-kk','format' = 'protobuf', -- 只能是protobuf格式'properties.security.protocol'='SASL_PLAINTEXT', -- 认证协议'properties.sasl.mechanism'='SCRAM-SHA-512', -- 认证方式'properties.sasl.jaas.config'='org.apache.kafka.common.security.scram.ScramLoginModule required username="account-subs-xxx-username" password="psw";' --用户名和密码);CREATE TABLE `jdbc_upsert_sink_table` (id INT PRIMARY KEY NOT ENFORCED,name STRING,age INT) WITH (-- 指定数据库连接参数'connector' = 'jdbc','url' = 'jdbc:mysql://172.28.28.138:3306/testdb', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'sink', -- 需要写入的数据表'username' = 'user', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'psw' -- 数据库访问的密码);INSERT INTO jdbc_upsert_sink_table SELECT * FROM DataInput;
参数值 | 必填 | 默认值 | 描述 |
connector | 是 | 无 | 固定值为 'tdsql-subscribe' |
topic | 是 | 无 | 要读的 Kafka Topic 名 |
properties.bootstrap.servers | 是 | 无 | 逗号分隔的 Kafka Bootstrap 地址 |
properties.group.id | 是 | 无 | Kafka 消费时的 Group ID |
format | 是 | 无 | Kafka 消息的输入格式。目前只支持 protobuf |
scan.startup.mode | 否 | group-offsets | Kafka consumer 的启动模式。 可以是 latest-offset 、earliest-offset 、specific-offsets 、group-offsets 、timestamp 的任何一种'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300' ,使用 'specific-offsets' 启动模式时需要指定每个 partition 对应的 offsets。'scan.startup.timestamp-miles' = '1631588815000' ,使用 'timestamp' 启动模式时需要指定启动的时间戳(单位毫秒) |
scan.startup.specific-offsets | 否 | 无 | 如果 scan.startup.mode 的值为'specific-offsets' ,则必须使用本参数指定具体起始读取的偏移量。例如 'partition:0,offset:42;partition:1,offset:300' |
scan.startup.timestamp-millis | 否 | 无 | 如果 scan.startup.mode 的值为'timestamp' ,则必须使用本参数来指定开始读取的时间点(毫秒为单位的 Unix 时间戳) |
tdsql.database.name | 否 | 无 | tdsql 数据库名称,配置此参数,可以消费 tdsql 指定数据库的 binlog,前提是订阅任务包含了此数据库的 binlog。参数支持正则,例如 test_case_2022_06_0* |
tdsql.table.name | 否 | 无 | tdsql 数据表名称,配置此参数,可以消费 tdsql 指定数据表的 binlog,前提是订阅任务包含了此数据表的 binlog。参数支持正则,例如 test_0* ,test_1,test_2 |
tdsql.database.name
或 tdsql.table.name
参数,订阅任务建议配置 订阅全实例 ,如果有多个 Oceanus 任务消费不同 tdsql 数据库表时,多个 Oceanus 任务需要使用订阅任务的不同消费组,可在订阅任务中创建不同消费组。utf8
和 gbk
两种。
本页内容是否解决了您的问题?