Flink 版本 | 说明 |
1.11 | 支持 MySQL 版本为 5.6 |
1.13 | 支持 MySQL 版本为 5.6, 5.7, 8.x 默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置 'scan.incremental.snapshot.enabled' = 'false' |
1.14 | 支持 MySQL 版本为 5.6, 5.7, 8.x 默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置 'scan.incremental.snapshot.enabled' = 'false' |
1.16 | 支持 MySQL 版本为 5.6, 5.7, 8.x 默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置
'scan.incremental.snapshot.enabled' = 'false' |
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '192.168.10.22', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'hello@world!', -- 数据库访问的密码'database-name' = 'YourDatabase', -- 需要同步的数据库'table-name' = 'YourTable' -- 需要同步的数据表名);
参数 | 说明 | 是否必填 | 备注 |
connector | 源表类型 | 是 | 固定值为 mysql-cdc |
hostname | MySQL 数据库的 IP 地址或者 Hostname | 是 | - |
port | MySQL 数据库服务的端口号 | 否 | 默认值为3306 |
username | MySQL 数据库服务的用户名 | 是 | 有特定权限(包括 SELECT、RELOAD、SHOW DATABASES、REPLICATION SLAVE 和 REPLICATION CLIENT)的 MySQL 用户 |
password | MySQL 数据库服务的密码 | 是 | - |
database-name | MySQL 数据库名称 | 是 | 数据库名称支持正则表达式以读取多个数据库的数据 |
table-name | MySQL 表名 | 是 | 表名支持正则表达式以读取多个表的数据 |
server-id | 数据库客户端的一个 ID | 否 | 该 ID 必须是 MySQL 集群中全局唯一的。建议针对同一个数据库的每个作业都设置不同的 ID 范围值,例如 5400-5405 。默认会随机生成一个6400 - Integer.MAX_VALUE 的值 |
server-time-zone | 数据库在使用的会话时区 | 否 | 例如 Asia/Shanghai,该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型 |
append-mode | 开启 append 流模式 | 否 | Flink1.13及以上版本支持, 例如:将 mysql-cdc 数据以 append 的方式同步到 hive |
filter-duplicate-pair-records | 过滤未在 Flink DDL 语句中定义的源表字段变更记录 | 否 | 例如 MySQL 源表有 a, b, c, d 四个字段,而用户在 Flink SQL 建表时只定义了 a, b 两个字段;开启该参数后,仅涉及 c 或 d 字段的变更记录会被忽略,不会输出到下游,可减少计算量和处理压力 |
scan.lastchunk.optimize.enable | 对全量阶段的最后一个分片做重划分 | 否 | 如果全量同步期间,源表持续有大量写入和变更,则可能导致最后一个分片过大,引起 TaskManager OOM 崩溃重启。
开启本功能后(值设置为 true),Flink 会自动将过大的最后一个分片分成若干的小分片,提升作业的稳定性 |
debezium.min.row.count.to.stream.results | 当表的条数大于该值时,会使用分批读取模式 | 否 | 默认值为1000。Flink 采用以下方式读取 MySQL 源表数据: 全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有 OOM 风险 分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有 OOM 风险,缺点是读取速度相对较慢 |
debezium.snapshot.fetch.size | 在 Snapshot 阶段,每次读取 MySQL 源表数据行数的最大值 | 否 | 仅当分批读取模式时,该参数生效 |
debezium.skipped.operations | 需要过滤的 oplog 操作。操作包括 c 表示插入,u 表示更新,d 表示删除。默认情况下,不跳过任何操作,以逗号分隔 | 否 | - |
scan.incremental.snapshot.enabled | 增量快照 | 否 | 默认为 true |
scan.incremental.snapshot.chunk.size | 当读取表的快照时,表快照捕获的表的块大小(行数) | 否 | 默认为 8096 |
scan.lazy-calculate-splits.enabled | 全量阶段JM中数据分片懒加载避免数据量太大,分片数据太多导致JM OOM | 否 | 默认为 true |
scan.newly-added-table.enabled | 动态加表 | 否 | 默认为 false |
scan.split-key.mode | 联合主键作为 splitkey 的模式 | 否 | 取值为 default / specific;其中 default 为默认逻辑,采用联合主键的第一个字段作为 split key;设置为 specific 需要设置 scan.split-key.specific-column 指定联合主键中的某个字段 |
scan.split-key.specific-column | 指定联合主键中某个字段作为 splitkey | 否 | 当 scan.split-key.mode 为 specific 时必填。取值为联合主键中某个字段名 |
connect.timeout | 尝试连接到 MySQL 数据库服务器后在超时之前等待的最长时间 | 否 | 默认 30s |
connect.max-retries | 建立MySQL连接尝试最大的次数 | 否 | 默认 3 |
connection.pool.size | 连接池大小 | 否 | 默认 20 |
jdbc.properties.* | 自定义JDBC URL参数,例如: 'jdbc.properties.useSSL' = 'false' | 否 | 默认 20 |
heartbeat.interval | 发送心跳事件的时间间隔,用于跟踪最新可用的binlog偏移量, 一般用于解决慢表的问题(更新缓慢的数据表) | 否 | 默认 20 |
debezium.* | Debezium 属性参数 | 否 |
列 | 数据类型 | 描述 |
database_name/meta.database_name | STRING NOT NULL | 包含该 Row 的数据库名称 |
table_name/meta.table_name | STRING NOT NULL | 包含该 Row 的表名称 |
op_ts/meta.op_ts | TIMESTAMP_LTZ(3) NOT NULL | Row 在数据库中进行更改的时间 |
meta.batch_id | BIGINT | binlog 的批 id |
meta.is_ddl | BOOLEAN | 是否 DDL 语句 |
meta.mysql_type | MAP | 数据表结构 |
meta.update_before | ARRAY | 未修改前字段的值 |
meta.pk_names | ARRAY | 主键字段名 |
meta.sql | STRING | 暂时为空 |
meta.sql_type | MAP | sql_type 表的字段到 Java 数据类型 ID 的映射 |
meta.ts | TIMESTAMP_LTZ(3) NOT NULL | 收到该 ROW 并处理的当前时间 |
meta.op_type | STRING | 数据库操作类型,例如 INSERT/DELETE 等 |
meta.file | STRING | 全量阶段时为空。增量阶段时为数据来自的 binlog 文件名,例如 mysql-bin.000101 |
meta.pos | BIGINT | 全量阶段时为0。增量阶段时为数据来自的 binlog 文件偏移,例如 143127802 |
meta.gtid | STRING | 全量阶段时为 null。增量阶段时为数据对应的 gtid 值,例如 3d3c4464-c320-11e9-8b3a-6c92bf62891a:66486240 |
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,`database_name` string METADATA FROM 'database_name',`table_name` string METADATA FROM 'table_name',`op_ts` timestamp(3) METADATA FROM 'op_ts',`op_type` string METADATA FROM 'meta.op_type',`batch_id` bigint METADATA FROM 'meta.batch_id',`is_ddl` boolean METADATA FROM 'meta.is_ddl',`update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',`mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',`pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',`sql` STRING METADATA FROM 'meta.sql',`sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',`ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '192.168.10.22', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'hello@world!', -- 数据库访问的密码'database-name' = 'YourDatabase', -- 需要同步的数据库'table-name' = 'YourTable' -- 需要同步的数据表名);
()
把正则式包围起来。 MySQL type | Flink SQL type | NOTE |
TINYINT | TINYINT | - |
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL | SMALLINT | - |
INT MEDIUMINT SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL | INT | - |
BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL | BIGINT | - |
BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL | DECIMAL(20, 0) | - |
FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL | FLOAT | - |
REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL | DOUBLE | - |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38 | DECIMAL(p, s) | - |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65 | STRING | MySQL 中 DECIMAL 数据类型的精度最高为 65,而 Flink 中 DECIMAL 的精度限制为 38。 所以如果您定义一个精度大于38的十进制列,您应该把它映射到STRING,以避免精度损失 |
BOOLEAN TINYINT(1) BIT(1) | BOOLEAN | - |
DATE | DATE | - |
TIME [(p)] | TIME [(p)] | - |
TIMESTAMP [(p)] DATETIME [(p)] | TIMESTAMP [(p)] | - |
CHAR(n) | CHAR(n) | - |
VARCHAR(n) | VARCHAR(n) | - |
BIT(n) | BINARY(⌈n/8⌉) | - |
BINARY(n) | BINARY(n) | - |
VARBINARY(N) | VARBINARY(N) | - |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT | STRING | - |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB | BYTES | 对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2,147,483,647(2 ** 31 - 1) 的 blob |
YEAR | INT | - |
ENUM | STRING | - |
JSON | STRING | JSON 数据类型会在 Flink 中转换为 JSON 格式的 STRING |
SET | ARRAY<STRING> | 由于 MySQL 中的 SET 数据类型是一个可以有零个或多个值的字符串对象,所以它应该总是映射到一个字符串数组 |
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION | STRING | MySQL 中的空间数据类型会被转换成固定 Json 格式的 STRING |
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH ('connector' = 'mysql-cdc', -- 固定值 'mysql-cdc''hostname' = '192.168.10.22', -- 数据库的 IP'port' = '3306', -- 数据库的访问端口'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)'password' = 'hello@world!', -- 数据库访问的密码'database-name' = 'YourDatabase', -- 需要同步的数据库'table-name' = 'YourTable' -- 需要同步的数据表名);CREATE TABLE `print_table` (`id` INT,`name` STRING) WITH ('connector' = 'print');insert into print_table select * from mysql_cdc_source_table;
execution.checkpointing.tolerable-failed-checkpoints: 100
调整 checkpoint 失败的容忍次数。'scan.incremental.snapshot.enabled' = 'false'
开启 CDC 1.0 模式,会存在以下风险:GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';FLUSH PRIVILEGES;
'debezium.snapshot.locking.mode' = 'none'
跳过这个阶段。index1
、index2
、index3
、index4
4个字段为联合主键索引,要和 PRIMARY KEY
定义保持一致,顺序不会影响正常的同步。CREATE TABLE db_order_dim (`index1` STRING,`index2` STRING,`index3` STRING,`index4` STRING,`field5` STRING,`field6` STRING,PRIMARY KEY(`index1`, `index2`, `index3`, `index4`) NOT enforced) WITH (...);
server-id
,因为 Oceanus 平台会为每个表自动生成随机 server-id
值(范围是 6400 - 2147483647
),以避免不同作业读取同一个库可能出现的 server-id
冲突问题。server-id
值,对于 CDC 2.x 版本,建议设置为范围值,例如 5400-5405
,因为每个并行读取器应该有一个唯一的服务器 ID,所以 server-id
必须是 5400-5405
这样的范围,且范围必须大于并行度。但对于 CDC 1.x 版本,只能设置单个 server-id
值,不支持范围设定。server-id
有以下两种方式:mysql-cdc
DDL 的 WITH 参数中指定。server-id
。SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
interactive_timeout
服务器在关闭交互式连接之前等待其活动的秒数。请参见 MySQL 文档。wait_timeout
服务器在关闭非交互式连接之前等待其活动的秒数。请参见 MySQL 文档。SET GLOBAL slave_net_timeout = 120;SET GLOBAL thread_pool_idle_timeout = 120;
into chunks
关键字,例如 Start splitting table cdc_basic_source.random_source_1 into chunks
或者 Start lazily splitting table cdc_basic_source.random_source_1 into chunks
chunks, time cost
关键字,例如 Split table cdc_basic_source.random_source_1 into 14 chunks, time cost: 994ms.
DEBUG
级别日志,搜索 Current assigned splits for
关键字,即可查看每个表的总分片数和分配进度。finished. Total split number
关键字,例如 Split assignment for cdc_basic_source.random_source_1 finished. Total split number: 14
Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED
日志。Initial assigning finished as there are no more splits. Creating binlog split
或者 Newly added assigning finished as there are no more splits. Waking up binlog reader
日志。has entered pure binlog phase
日志。例如 Table cdc_basic_source.random_source_2 has entered pure binlog phase.
Received schema change event
日志。EOFException
,则可根据提示语,调整 MySQL 服务端的超时参数。同时也可以减少总并行度,升级每个 TaskManager 的规格,以减少内存和 CPU 压力过高导致的超时问题的发生概率。
本页内容是否解决了您的问题?