tencent cloud

文档反馈

数据库 MySQL CDC

最后更新时间:2023-11-08 16:18:42

    介绍

    MySQL 的 CDC 源表,支持对 MySQL 数据库的全量和增量读取,并保证 Exactly Once 语义。MySQL CDC 底层使用了 Debezium 来实现对变更数据流的实时抓取(Change Data Capture)。

    MySQL CDC 1.x 工作机制

    1. 获取一个全局读锁,从而阻塞住其他数据库客户端的写操作。
    2. 开启一个可重复读语义的事务,来保证后续在同一个事务内读操作都是在一个一致性快照中完成的。
    3. 读取 Binlog 的当前位置。
    4. 读取连接器中配置的数据库和表的模式(schema)信息。
    5. 释放全局读锁,允许其他的数据库客户端对数据库进行写操作。
    6. 扫描全表,当全表数据读取完后,会从第3步中得到的 Binlog 位置获取增量的变更记录。
    Flink 作业运行期间会周期性执行快照,记录下 Binlog 位置,当作业崩溃恢复时,便会从之前记录的 Binlog 点继续处理,从而保证 Exactly once 语义。

    MySQL CDC 2.x 工作机制

    1. MySQL 表需要有主键,如果是联合主键则会选择数据表中的第一个主键作为全量阶段的 splitKey,其用来将数据分为多个分片(Chunk)。
    2. 全量阶段使用无锁算法,无需给表加锁。
    3. 整个同步过程分为两个阶段,全量阶段并发读取分片数据,全量阶段结束之后进入增量阶段,整个过程都支持 Checkpoint 从而保证 Exactly once 语义。

    版本说明

    Flink 版本
    说明
    1.11
    支持 MySQL 版本为 5.6
    1.13
    支持 MySQL 版本为 5.6, 5.7, 8.x
    默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置 'scan.incremental.snapshot.enabled' = 'false'
    1.14
    支持 MySQL 版本为 5.6, 5.7, 8.x
    默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置 'scan.incremental.snapshot.enabled' = 'false'
    1.16
    支持 MySQL 版本为 5.6, 5.7, 8.x
    默认配置,需要 source 表有主键。如果 source 表没有主键,需要 with 参数需要设置 'scan.incremental.snapshot.enabled' = 'false'

    使用范围

    MySQL CDC 只支持作为源表。

    DDL 定义

    CREATE TABLE `mysql_cdc_source_table` (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
    ) WITH (
    'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
    'hostname' = '192.168.10.22', -- 数据库的 IP
    'port' = '3306', -- 数据库的访问端口
    'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
    'password' = 'hello@world!', -- 数据库访问的密码
    'database-name' = 'YourDatabase', -- 需要同步的数据库
    'table-name' = 'YourTable' -- 需要同步的数据表名
    );

    WITH 参数

    参数
    说明
    是否必填
    备注
    connector
    源表类型
    固定值为 mysql-cdc
    hostname
    MySQL 数据库的 IP 地址或者 Hostname
    -
    port
    MySQL 数据库服务的端口号
    默认值为3306
    username
    MySQL 数据库服务的用户名
    有特定权限(包括 SELECT、RELOAD、SHOW DATABASES、REPLICATION SLAVE 和 REPLICATION CLIENT)的 MySQL 用户
    password
    MySQL 数据库服务的密码
    -
    database-name
    MySQL 数据库名称
    数据库名称支持正则表达式以读取多个数据库的数据
    table-name
    MySQL 表名
    表名支持正则表达式以读取多个表的数据
    server-id
    数据库客户端的一个 ID
    该 ID 必须是 MySQL 集群中全局唯一的。建议针对同一个数据库的每个作业都设置不同的 ID 范围值,例如5400-5405。默认会随机生成一个6400 - Integer.MAX_VALUE 的值
    server-time-zone
    数据库在使用的会话时区
    例如 Asia/Shanghai,该参数控制了 MySQL 中的 TIMESTAMP 类型如何转成 STRING 类型
    append-mode
    开启 append 流模式
    Flink1.13及以上版本支持, 例如:将 mysql-cdc 数据以 append 的方式同步到 hive
    filter-duplicate-pair-records
    过滤未在 Flink DDL 语句中定义的源表字段变更记录
    例如 MySQL 源表有 a, b, c, d 四个字段,而用户在 Flink SQL 建表时只定义了 a, b 两个字段;开启该参数后,仅涉及 c 或 d 字段的变更记录会被忽略,不会输出到下游,可减少计算量和处理压力
    scan.lastchunk.optimize.enable
    对全量阶段的最后一个分片做重划分
    如果全量同步期间,源表持续有大量写入和变更,则可能导致最后一个分片过大,引起 TaskManager OOM 崩溃重启。 开启本功能后(值设置为 true),Flink 会自动将过大的最后一个分片分成若干的小分片,提升作业的稳定性
    debezium.min.row.count.to.stream.results
    当表的条数大于该值时,会使用分批读取模式
    默认值为1000。Flink 采用以下方式读取 MySQL 源表数据:
    全量读取:直接将整个表的数据读取到内存里。优点是速度快,缺点是会消耗对应大小的内存,如果源表数据量非常大,可能会有 OOM 风险
    分批读取:分多次读取,每次读取一定数量的行数,直到读取完所有数据。优点是读取数据量比较大的表没有 OOM 风险,缺点是读取速度相对较慢
    debezium.snapshot.fetch.size
    在 Snapshot 阶段,每次读取 MySQL 源表数据行数的最大值
    仅当分批读取模式时,该参数生效
    debezium.skipped.operations
    需要过滤的 oplog 操作。操作包括 c 表示插入,u 表示更新,d 表示删除。默认情况下,不跳过任何操作,以逗号分隔
    -
    scan.incremental.snapshot.enabled
    增量快照
    默认为 true
    scan.incremental.snapshot.chunk.size
    当读取表的快照时,表快照捕获的表的块大小(行数)
    默认为 8096
    scan.lazy-calculate-splits.enabled
    全量阶段JM中数据分片懒加载避免数据量太大,分片数据太多导致JM OOM
    默认为 true
    scan.newly-added-table.enabled
    动态加表
    默认为 false
    scan.split-key.mode
    联合主键作为 splitkey 的模式
    取值为 default / specific;其中 default 为默认逻辑,采用联合主键的第一个字段作为 split key;设置为 specific 需要设置 scan.split-key.specific-column 指定联合主键中的某个字段
    scan.split-key.specific-column
    指定联合主键中某个字段作为 splitkey
    当 scan.split-key.mode 为 specific 时必填。取值为联合主键中某个字段名
    connect.timeout
    尝试连接到 MySQL 数据库服务器后在超时之前等待的最长时间
    默认 30s
    connect.max-retries
    建立MySQL连接尝试最大的次数
    默认 3
    connection.pool.size
    连接池大小
    默认 20
    jdbc.properties.*
    自定义JDBC URL参数,例如:'jdbc.properties.useSSL' = 'false'
    默认 20
    heartbeat.interval
    发送心跳事件的时间间隔,用于跟踪最新可用的binlog偏移量, 一般用于解决慢表的问题(更新缓慢的数据表)
    默认 20
    debezium.*
    Debezium 属性参数
    从更细粒度控制 Debezium 客户端的行为。例如'debezium.snapshot.mode' = 'never',详情请参见 配置属性

    可用元数据(Flink1.13 及以上版本可使用)

    支持的元数据列:
    数据类型
    描述
    database_name/meta.database_name
    STRING NOT NULL
    包含该 Row 的数据库名称
    table_name/meta.table_name
    STRING NOT NULL
    包含该 Row 的表名称
    op_ts/meta.op_ts
    TIMESTAMP_LTZ(3) NOT NULL
    Row 在数据库中进行更改的时间
    meta.batch_id
    BIGINT
    binlog 的批 id
    meta.is_ddl
    BOOLEAN
    是否 DDL 语句
    meta.mysql_type
    MAP
    数据表结构
    meta.update_before
    ARRAY
    未修改前字段的值
    meta.pk_names
    ARRAY
    主键字段名
    meta.sql
    STRING
    暂时为空
    meta.sql_type
    MAP
    sql_type 表的字段到 Java 数据类型 ID 的映射
    meta.ts
    TIMESTAMP_LTZ(3) NOT NULL
    收到该 ROW 并处理的当前时间
    meta.op_type
    STRING
    数据库操作类型,例如 INSERT/DELETE 等
    meta.file
    STRING
    全量阶段时为空。增量阶段时为数据来自的 binlog 文件名,例如 mysql-bin.000101
    meta.pos
    BIGINT
    全量阶段时为0。增量阶段时为数据来自的 binlog 文件偏移,例如 143127802
    meta.gtid
    STRING
    全量阶段时为 null。增量阶段时为数据对应的 gtid 值,例如 3d3c4464-c320-11e9-8b3a-6c92bf62891a:66486240

    使用示例

    CREATE TABLE `mysql_cdc_source_table` (
    `id` INT,
    `name` STRING,
    `database_name` string METADATA FROM 'database_name',
    `table_name` string METADATA FROM 'table_name',
    `op_ts` timestamp(3) METADATA FROM 'op_ts',
    `op_type` string METADATA FROM 'meta.op_type',
    `batch_id` bigint METADATA FROM 'meta.batch_id',
    `is_ddl` boolean METADATA FROM 'meta.is_ddl',
    `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',
    `mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',
    `pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',
    `sql` STRING METADATA FROM 'meta.sql',
    `sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',
    `ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',
    PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
    ) WITH (
    'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
    'hostname' = '192.168.10.22', -- 数据库的 IP
    'port' = '3306', -- 数据库的访问端口
    'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
    'password' = 'hello@world!', -- 数据库访问的密码
    'database-name' = 'YourDatabase', -- 需要同步的数据库
    'table-name' = 'YourTable' -- 需要同步的数据表名
    );

    MySQL 分库分表读取方式

    目前 Oceanus 已支持 MySQL 分库分表的读取。
    如果 MySQL 是一个分库分表的数据库,分成了 A_1、 A_2、A_3 ... 等多个表,且所有表的 schema 一致,则可以通过 table-name 选项,指定一个正则表达式来匹配读取多张表,例如设置 table-name 为 A_.* ,监控所有 A_ 前缀的表。database-name 选项也支持该功能
    说明
    如果 database-name 和 table-name 设置为正则匹配的话,需要使用()把正则式包围起来。

    类型映射

    MySQL 的 CDC 和 Flink 字段类型对应关系如下:
    MySQL type
    Flink SQL type
    NOTE
    TINYINT
    TINYINT
    -
    SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL
    SMALLINT
    -
    INT
    MEDIUMINT
    SMALLINT UNSIGNED
    SMALLINT UNSIGNED ZEROFILL
    INT
    -
    BIGINT
    INT UNSIGNED
    INT UNSIGNED ZEROFILL
    MEDIUMINT UNSIGNED
    MEDIUMINT UNSIGNED ZEROFILL
    BIGINT
    -
    BIGINT UNSIGNED
    BIGINT UNSIGNED ZEROFILL
    SERIAL
    DECIMAL(20, 0)
    -
    FLOAT
    FLOAT UNSIGNED
    FLOAT UNSIGNED ZEROFILL
    FLOAT
    -
    REAL
    REAL UNSIGNED
    REAL UNSIGNED ZEROFILL
    DOUBLE
    DOUBLE UNSIGNED
    DOUBLE UNSIGNED ZEROFILL
    DOUBLE PRECISION
    DOUBLE PRECISION UNSIGNED
    DOUBLE PRECISION UNSIGNED ZEROFILL
    DOUBLE
    -
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    NUMERIC(p, s) UNSIGNED ZEROFILL
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    DECIMAL(p, s) UNSIGNED ZEROFILL
    FIXED(p, s)
    FIXED(p, s) UNSIGNED
    FIXED(p, s) UNSIGNED ZEROFILL
    where p <= 38
    DECIMAL(p, s)
    -
    NUMERIC(p, s)
    NUMERIC(p, s) UNSIGNED
    NUMERIC(p, s) UNSIGNED ZEROFILL
    DECIMAL(p, s)
    DECIMAL(p, s) UNSIGNED
    DECIMAL(p, s) UNSIGNED ZEROFILL
    FIXED(p, s)
    FIXED(p, s) UNSIGNED
    FIXED(p, s) UNSIGNED ZEROFILL
    where 38 < p <= 65
    STRING
    MySQL 中 DECIMAL 数据类型的精度最高为 65,而 Flink 中 DECIMAL 的精度限制为 38。 所以如果您定义一个精度大于38的十进制列,您应该把它映射到STRING,以避免精度损失
    BOOLEAN
    TINYINT(1)
    BIT(1)
    BOOLEAN
    -
    DATE
    DATE
    -
    TIME [(p)]
    TIME [(p)]
    -
    TIMESTAMP [(p)]
    DATETIME [(p)]
    TIMESTAMP [(p)]
    -
    CHAR(n)
    CHAR(n)
    -
    VARCHAR(n)
    VARCHAR(n)
    -
    BIT(n)
    BINARY(⌈n/8⌉)
    -
    BINARY(n)
    BINARY(n)
    -
    VARBINARY(N)
    VARBINARY(N)
    -
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    STRING
    -
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BYTES
    对于 MySQL 中的 BLOB 数据类型,仅支持长度不大于 2,147,483,647(2 ** 31 - 1) 的 blob
    YEAR
    INT
    -
    ENUM
    STRING
    -
    JSON
    STRING
    JSON 数据类型会在 Flink 中转换为 JSON 格式的 STRING
    SET
    ARRAY<STRING>
    由于 MySQL 中的 SET 数据类型是一个可以有零个或多个值的字符串对象,所以它应该总是映射到一个字符串数组
    GEOMETRY
    POINT
    LINESTRING
    POLYGON
    MULTIPOINT
    MULTILINESTRING
    MULTIPOLYGON
    GEOMETRYCOLLECTION
    STRING
    MySQL 中的空间数据类型会被转换成固定 Json 格式的 STRING

    代码示例

    CREATE TABLE `mysql_cdc_source_table` (
    `id` INT,
    `name` STRING,
    PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
    ) WITH (
    'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc'
    'hostname' = '192.168.10.22', -- 数据库的 IP
    'port' = '3306', -- 数据库的访问端口
    'username' = 'debezium', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限)
    'password' = 'hello@world!', -- 数据库访问的密码
    'database-name' = 'YourDatabase', -- 需要同步的数据库
    'table-name' = 'YourTable' -- 需要同步的数据表名
    );
    
    CREATE TABLE `print_table` (
    `id` INT,
    `name` STRING
    ) WITH (
    'connector' = 'print'
    );
    insert into print_table select * from mysql_cdc_source_table;

    注意事项

    Checkpoint 相关

    使用 CDC 1.0 ('scan.incremental.snapshot.enabled' = 'false') 时,需要做的额外参数配置。 由于 CDC 1.0 读取全量数据阶段无法做 checkpoint,当需要同步的表较多、数据较大时,可能会导致多次快照失败,从而引发作业失败。可以通过作业高级参数 execution.checkpointing.tolerable-failed-checkpoints: 100 调整 checkpoint 失败的容忍次数。
    使用 CDC 2.0,且作业的默认并行度大于 1 时,必须开启 checkpoint。
    CDC 读取完全量数据后,需要等待一个 checkpoint 完成后才能进入增量阶段。

    关于使用 CDC 1.0 的风险告知

    当表没有主键时,只能通过使用 WITH 参数 'scan.incremental.snapshot.enabled' = 'false' 开启 CDC 1.0 模式,会存在以下风险:
    1. 默认会使用 FTWRL (flush table with read lock)。
    2. 虽然 FTWRL 只会持有短暂的时间,但由于 FTWRL 的机制,可能会导致锁库
    3. FTWRL 可能会出现的情况如下:
    会等待正在执行的 update/select 操作完成。
    在等待 update/select 完成的期间,会导致数据库不可用,即阻塞新加入的 SELECT 查询,这是 MySQL Query Cache 机制导致的。
    如果同时启动多个不同的 MySQL CDC 1.0 的 source,大概率会碰到上述情况。

    用户权限

    用于同步的源数据库的用户必须拥有以下权限 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD。
    GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    FLUSH PRIVILEGES;

    全局读锁

    上述的工作原理中,MySQL CDC 1.x 中可以看到第一步就会获取一个全局读锁,用于获取 schema 和 Binlog 位置。这里会阻塞其他客户端的写入,因此仍可能对线上业务造成影响。 若可以接受 At Least Once 语义,可通过设置 WITH 参数 'debezium.snapshot.locking.mode' = 'none' 跳过这个阶段。

    联合主键设置

    例如,下面的 DDL 设置了index1index2index3index4 4个字段为联合主键索引,要和 PRIMARY KEY 定义保持一致,顺序不会影响正常的同步。
    CREATE TABLE db_order_dim (
    `index1` STRING,
    `index2` STRING,
    `index3` STRING,
    `index4` STRING,
    `field5` STRING,
    `field6` STRING,
    PRIMARY KEY(`index1`, `index2`, `index3`, `index4`) NOT enforced
    ) WITH (
    ...
    );

    server-id 定义

    不建议显式指定server-id,因为 Oceanus 平台会为每个表自动生成随机 server-id 值(范围是 6400 - 2147483647),以避免不同作业读取同一个库可能出现的 server-id 冲突问题。
    如果必须要手动指定 server-id 值,对于 CDC 2.x 版本,建议设置为范围值,例如 5400-5405,因为每个并行读取器应该有一个唯一的服务器 ID,所以 server-id 必须是 5400-5405 这样的范围,且范围必须大于并行度。但对于 CDC 1.x 版本,只能设置单个 server-id 值,不支持范围设定。
    指定server-id 有以下两种方式:
    1. mysql-cdc DDL 的 WITH 参数中指定。
    2. 使用 SQL Hints 来指定 server-id
    SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;

    设置 MySQL 会话超时

    全量阶段读取大型数据库的时候可能会超时,您可以对 MySQL 做一些配置来避免这个问题。
    interactive_timeout 服务器在关闭交互式连接之前等待其活动的秒数。请参见 MySQL 文档
    wait_timeout 服务器在关闭非交互式连接之前等待其活动的秒数。请参见 MySQL 文档
    增量阶段也有可能因为 TaskManager 负载过高导致心跳失效,服务端主动断开连接(EOFException)。可以在 MySQL 服务端执行下面的 SQL 语句,调大超时时间:
    SET GLOBAL slave_net_timeout = 120;
    SET GLOBAL thread_pool_idle_timeout = 120;

    JobManager 关键日志说明

    在 CDC 2.x 模式下,每个表的同步都要经历分片划分全量快照同步增量修正纯增量同步等阶段。由于前三个阶段资源占用大、耗时久,Oceanus 在日志和指标方面做了加强,协助用户洞察和分析作业的运行情况。

    1. 分片划分与分配

    开始划分分片:搜索 into chunks 关键字,例如 Start splitting table cdc_basic_source.random_source_1 into chunks 或者 Start lazily splitting table cdc_basic_source.random_source_1 into chunks
    结束划分分片: 搜索 chunks, time cost 关键字,例如 Split table cdc_basic_source.random_source_1 into 14 chunks, time cost: 994ms.

    2. 全量快照同步

    查看全量分片分配进度:开启 DEBUG 级别日志,搜索 Current assigned splits for 关键字,即可查看每个表的总分片数和分配进度。
    结束全量分片分配:搜索 finished. Total split number 关键字,例如 Split assignment for cdc_basic_source.random_source_1 finished. Total split number: 14
    结束全量阶段:搜索 Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED 日志。

    3. 增量修正

    开始增量修正阶段:搜索 Initial assigning finished as there are no more splits. Creating binlog split 或者 Newly added assigning finished as there are no more splits. Waking up binlog reader 日志。

    4. 纯增量同步

    参见下一节 “TaskManager 关键日志说明”。

    TaskManager 关键日志说明

    进入纯增量同步阶段:搜索 has entered pure binlog phase 日志。例如 Table cdc_basic_source.random_source_2 has entered pure binlog phase.
    表的 Schema 变更:搜索 Received schema change event 日志。
    EOFException 异常:如果作业重启,且提示异常是 EOFException,则可根据提示语,调整 MySQL 服务端的超时参数。同时也可以减少总并行度,升级每个 TaskManager 的规格,以减少内存和 CPU 压力过高导致的超时问题的发生概率。

    监控指标说明

    Oceanus 为 MySQL CDC Connector 也增加了很多实用的统计指标。单击 Flink UI 的运行图中的 MySQL CDC 源算子,即可搜索并查看指标:
    logpos:获取当前消费到的 Binlog 位点,可以协助定位消费卡顿等问题。
    numberOfInsertRecords:获取输出的 +I 消息数。
    numberOfDeleteRecords:获取输出的 -D 消息数。
    numberOfUpdateBeforeRecords:获取输出的 -U 消息数。
    numberOfUpdateAfterRecords:获取输出的 +U 消息数。
    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持