tencent cloud

All product documents
Stream Compute Service
MySQL CDC
Last updated: 2023-11-08 16:18:29
MySQL CDC
Last updated: 2023-11-08 16:18:29

Overview

The MySQL CDC connector (as a source) allows for reading snapshot data and incremental data from MySQL databases and guarantees the exactly-once semantic. This connector uses Debezium as its underlying platform to implement change data capture (CDC).

How MySQL CDC 1.x works

1. Acquires a global read lock to prohibit writes by other database clients.
2. Starts a repeatable read transaction to ensure that data is always read from the same checkpoint.
3. Reads the current binlog position.
4. ‍Reads the schema of the database and table configured in the connector.
5. Releases the global read lock to allow writes by other databases.
6. Scans the table and, after all data is read, captures the changes made after the binlog position in step 3.
Flink will periodically perform checkpoints to record the binlog position. In case of failover, the job will restart and restore from the checkpointed binlog position. Consequently, it guarantees the exactly-once semantic.

How MySQL CDC 2.x works

1. A MySQL table must contain a primary key. If a composite key is used, the first field of the key will be selected as the partitioning key (splitKey) to divide the data into chunks during the snapshot phase.
2. A lock-free algorithm is used during the snapshot phase, with no need to lock the table.
3. The sync process consists of two phases. During the snapshot phase, chunks are read concurrently. After this phase ends, the incremental phase starts. The whole process supports checkpoints to guarantee the exactly-once semantic.

Versions

Flink Version
Description
1.11
MySQL v5.6 supported
1.13
MySQL v5.6, v5.7, and v8.x supported
Configured by default. The source must contain a primary key. If the source has no primary key, you need to set 'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters.
1.14
MySQL v5.6, v5.7, and v8.x supported
Configured by default. The source must contain a primary key. If the source has no primary key, you need to set 'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters.
1.16
MySQL v5.6, v5.7, and v8.x supported
Configured by default. The source must contain a primary key. ‌If the source has no primary key, you need to set 'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters.

Limits

The MySQL CDC connector can be used only as a source.

Defining a table in DDL

CREATE TABLE `mysql_cdc_source_table` (
`id` INT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
) WITH (
'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
'hostname' = '192.168.10.22', -- IP of the MySQL database server.
'port' = '3306', -- Integer port number of the MySQL database server.
'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.
'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.
'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.
);

WITH parameters

Option
Description
Required
Remarks
connector
The connector to use.
Yes
Here, it should be 'mysql-cdc'.
hostname
IP address or hostname of the MySQL database server.
Yes
-
port
Integer port number of the MySQL database server.
No
Default value: 3306
username
Name of the MySQL database to use when connecting to the MySQL database server.
Yes
A MySQL user with required permissions (including SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT).
password
Password to use when connecting to the MySQL database server.
Yes
-
database-name
Database name of the MySQL server to monitor.
Yes
‌The table-name also supports regular expressions to monitor multiple tables matching the regular expression.
table-name
Table name of the MySQL database to monitor.
Yes
The table-name also supports regular expressions to monitor multiple tables matching the regular expression.
server-id
A numeric ID of this database client.
No
It must be unique across all currently-running database processes in the MySQL cluster. We recommend setting a unique ID range for each job in a database, such as 5400-5405. By default, a random number equal to 6400 - Integer.MAX_VALUE is generated.
server-time-zone
Session time zone in the database server.
No
An example is "Asia/Shanghai". It controls how the TIMESTAMP type in MySQL is converted to STRING.
append-mode
Whether to enable the append mode.
No
This mode is available to Flink v1.13 or later. It allows, for example, syncing the MySQL CDC data to Hive.
filter-duplicate-pair-records
Filters source field change records that are not defined in the Flink DDL statements.
No
For example, a MySQL source contains four fields a, b, c, and d, but only a and b are defined in the table creation SQL statements. If this option is used, change records involving only field c or d will be ignored and not be delivered to the downstream system, reducing the data to be manipulated.
scan.lastchunk.optimize.enable
Repartitions the last chunk of the snapshot phase.
No
Numerous writes and changes continuously made to the source during the snapshot phase may cause the last chunk to be too large, resulting in the crash and restart of TaskManagers due to OOM. If this option is enabled (set to true), Flink automatically divides the last chunk that is too large into small ones to make the job more stable.
debezium.min.row.count.to.stream.results
When the number of records in the table is greater than this value, the records will be read in batches.
No
It defaults to 1000. Flink reads data from a source with one of the following methods:
Full read: The data in the whole table is read to the memory. This method allows quick read of the data, but the memory of the corresponding size will be consumed. If the source is extremely large, this poses the risk of OOM.
Batch read: Certain rows of data is read each time until all the data is read. This method will not pose the risk of OOM when the source is large, but the reading is slow.
debezium.snapshot.fetch.size
Specifies the maximum number of rows that should be read from a MySQL source per time during the snapshot phase.
No
This option applies only for the batch read mode.
debezium.skipped.operations
Specifies the operations to be skipped, separated by comma. Operations include c (insert), u (update), and d (delete). By default, no operations will be skipped.
No
-
scan.incremental.snapshot.enabled
Specifies the incremental snapshot.
No
Default value: true.
scan.incremental.snapshot.chunk.size
The chunk size (number of rows) of table snapshot. Captured tables are split into multiple chunks when the table snapshot is read.
No
Default value: 8096.
scan.lazy-calculate-splits.enabled
Whether to enable lazy loading for the JobManager during the snapshot phase to avoid JobManager OOM because of too large data and too many chunks.
No
Default value: true.
scan.newly-added-table.enabled
Whether to enable dynamic loading.
No
Default value: false.
scan.split-key.mode
Specifies the primary key as the partitioning key.
No
Two valid values are available: default and specific. default indicates that the first field of the composite key is used as the partitioning key; specific requires setting scan.split-key.specific-column to specify a field in the composite key as the partitioning key.
scan.split-key.specific-column
Specifies a field in the composite key as the partitioning key.
No
This option is required when the value of scan.split-key.mode is specific. Its value will be the name of a field in the composite key.
connect.timeout
The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
No
Default value: 30s.
connect.max-retries
Max times that the connector should retry to build a MySQL database server connection.
No
Default value: 3.
connection.pool.size
Connection pool size.
No
Default value: 20.
jdbc.properties.*
Option to pass custom JDBC URL parameters, such as 'jdbc.properties.useSSL' = 'false'.
No
Default value: 20.
heartbeat.interval
Interval of sending heartbeat event for tracing the latest available binlog offsets, generally used to address slow update of tables.
No
Default value: 20.
debezium.*
Debezium properties
No
Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.snapshot.mode' = 'never'. For details, see Debezium's MySQL Connector properties.

Available metadata (to Flink v1.13 or later)

Available metadata columns:
Key
Data Type
Description
database_name/meta.database_name
STRING NOT NULL
Name of the table that contains the row.
table_name/meta.table_name
STRING NOT NULL
Name of the database that contains the row.
op_ts/meta.op_ts
TIMESTAMP_LTZ(3) NOT NULL
When the change was made in the database.
meta.batch_id
BIGINT
The batch ID of the binlog.
meta.is_ddl
BOOLEAN
Whether the metadata consists of DDL statements.
meta.mysql_type
MAP
Table structure.
meta.update_before
ARRAY
Field value before it was modified.
meta.pk_names
ARRAY
Name of the primary key field.
meta.sql
STRING
Null
meta.sql_type
MAP
Maps the fields in sql_type to the Java data type ID.
meta.ts
TIMESTAMP_LTZ(3) NOT NULL
When the row was received and processed.
meta.op_type
STRING
Operation type, such as INSERT or DELETE.
meta.file
STRING
Null for the snapshot phase, and the name of the binlog file to which the data belongs for the incremental phase, such as mysql-bin.000101.
meta.pos
BIGINT
0 for the snapshot phase, and the offset of the binlog file to which the data belongs for the incremental phase, such as 143127802.
meta.gtid
STRING
Null for the snapshot phase, and the gtid of the data for the incremental read, such as 3d3c4464-c320-11e9-8b3a-6c92bf62891a:66486240.

Example

CREATE TABLE `mysql_cdc_source_table` (
`id` INT,
`name` STRING,
`database_name` string METADATA FROM 'database_name',
`table_name` string METADATA FROM 'table_name',
`op_ts` timestamp(3) METADATA FROM 'op_ts',
`op_type` string METADATA FROM 'meta.op_type',
`batch_id` bigint METADATA FROM 'meta.batch_id',
`is_ddl` boolean METADATA FROM 'meta.is_ddl',
`update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',
`mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',
`pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',
`sql` STRING METADATA FROM 'meta.sql',
`sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',
`ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',
PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
) WITH (
'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
'hostname' = '192.168.10.22', -- IP of the MySQL database server.
'port' = '3306', -- Integer port number of the MySQL database server.
'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.
'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.
'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.
);

Reading a MySQL database by sharding

Stream Compute Service supports reading MySQL databases by sharding.
If a MySQL database is already a shard-based database containing multiple tables such as A_1, A_2, A_3 …, and each table has the same schema, you can set the table-name option to a regex to match and read multiple tables. For example, you can set table-name to **A_.*** to monitor all tables prefixed with A_. database-name can also implement this feature.
Note
If database-name or table-name is set to a regex, the regex needs to be placed in ().

Data type mapping

MySQL CDC and Flink SQL data types are mapped as follows:
MySQL Type
Flink SQL Type
Note
TINYINT
TINYINT
-
SMALLINT TINYINT UNSIGNED TINYINT UNSIGNED ZEROFILL
SMALLINT
-
INT
MEDIUMINT
SMALLINT UNSIGNED
SMALLINT UNSIGNED ZEROFILL
INT
-
BIGINT
INT UNSIGNED
INT UNSIGNED ZEROFILL
MEDIUMINT UNSIGNED
MEDIUMINT UNSIGNED ZEROFILL
BIGINT
-
BIGINT UNSIGNED
BIGINT UNSIGNED ZEROFILL
SERIAL
DECIMAL(20, 0)
-
FLOAT
FLOAT UNSIGNED
FLOAT UNSIGNED ZEROFILL
FLOAT
-
REAL
REAL UNSIGNED
REAL UNSIGNED ZEROFILL
DOUBLE
DOUBLE UNSIGNED
DOUBLE UNSIGNED ZEROFILL
DOUBLE PRECISION
DOUBLE PRECISION UNSIGNED
DOUBLE PRECISION UNSIGNED ZEROFILL
DOUBLE
-
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where p <= 38
DECIMAL(p, s)
-
NUMERIC(p, s)
NUMERIC(p, s) UNSIGNED
NUMERIC(p, s) UNSIGNED ZEROFILL
DECIMAL(p, s)
DECIMAL(p, s) UNSIGNED
DECIMAL(p, s) UNSIGNED ZEROFILL
FIXED(p, s)
FIXED(p, s) UNSIGNED
FIXED(p, s) UNSIGNED ZEROFILL
where 38 < p <= 65
STRING
The precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
BOOLEAN
TINYINT(1)
BIT(1)
BOOLEAN
-
DATE
DATE
-
TIME [(p)]
TIME [(p)]
-
TIMESTAMP [(p)]
DATETIME [(p)]
TIMESTAMP [(p)]
-
CHAR(n)
CHAR(n)
-
VARCHAR(n)
VARCHAR(n)
-
BIT(n)
BINARY(⌈n/8⌉)
-
BINARY(n)
BINARY(n)
-
VARBINARY(N)
VARBINARY(N)
-
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
-
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTES
Currently, for BLOB data type in MySQL, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported.
YEAR
INT
-
ENUM
STRING
-
JSON
STRING
The JSON data type will be converted into STRING with JSON format in Flink.
SET
ARRAY<STRING>
As the SET data type in MySQL is a string object that can have zero or more values, it should always be mapped to an array of string.
GEOMETRY
POINT
LINESTRING
POLYGON
MULTIPOINT
MULTILINESTRING
MULTIPOLYGON
GEOMETRYCOLLECTION
STRING
The spatial data types in MySQL will be converted into STRING with a fixed JSON format.

Example

CREATE TABLE `mysql_cdc_source_table` (
`id` INT,
`name` STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.
) WITH (
'connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.
'hostname' = '192.168.10.22', -- IP of the MySQL database server.
'port' = '3306', -- Integer port number of the MySQL database server.
'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).
'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.
'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.
'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.
);

CREATE TABLE `print_table` (
`id` INT,
`name` STRING
) WITH (
'connector' = 'print'
);
insert into print_table select * from mysql_cdc_source_table;

Notes

Checkpoints

For MySQL CDC 1.0 ('scan.incremental.snapshot.enabled' = 'false'), you need to make additional configurations to use checkpoints. When MySQL CDC 1.0 reads snapshot data, it is unable to take a checkpoint. If many tables involving a large volume of data need to be synced, many times of checkpoint failure may occur, resulting in the job failure. You can set the allowed number of checkpoint failures (execution.checkpointing.tolerable-failed-checkpoints: 100) as instructed in Advanced Job Parameters.
If you use MySQL CDC 2.0 and the default job parallelism is greater than 1, checkpointing must be enabled.
After reading the snapshot data, the CDC connector will wait for the completion of a checkpoint before starting to read the incremental data.

Risks of using MySQL CDC 1.0

When a table has no primary key, you can only enable the CDC 1.0 mode using 'scan.incremental.snapshot.enabled' = 'false', which poses the following risks:
1. The FLUSH TABLES WITH READ LOCK (FTWRL) statement will be executed by default.
2. Although FTWRL only lasts for a short period of time, its mechanism may cause the database to be locked.
3. FTWRL may also cause the following:
Wait until an UPDATE/SELECT operation to complete.
During this period, the database will be unavailable, blocking new SELECT queries. This is caused by the Query Cache mechanism in MySQL.
If multiple MySQL CDC 1.0 sources run in parallel, the above situation probably will occur.

User permissions

The user of the source database must have the following permissions: SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD.
GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
FLUSH PRIVILEGES;

Global read lock

In the "How MySQL CDC 1.x works" section above, the first step is to acquire a global read lock to get the schema and binlog position. This will block writes by other database clients and may affect the online business. If the At Least Once semantic is acceptable, set 'debezium.snapshot.locking.mode' to 'none' to skip this step.

Setting a composite key

The following DDL statements set index1, index2, index3, and index4 as indices of the composite key. They must be defined in the same way in PRIMARY KEY, and their order will not affect the sync.
CREATE TABLE db_order_dim (
`index1` STRING,
`index2` STRING,
`index3` STRING,
`index4` STRING,
`field5` STRING,
`field6` STRING,
PRIMARY KEY(`index1`, `index2`, `index3`, `index4`) NOT enforced
) WITH (
...
);

Defining a server-id

We do not recommend you specify an explicit server-id, because the Stream Compute Service system automatically generates a random server-id in the range of 6400 - 2147483647 to avoid a server-id conflict that may occur when several jobs read the same database.
If you indeed need to specify a server-id in MySQL CDC 2.x, we recommend you set it to a range, such as 5400-5405. Since each parallel reader must have a unique server ID, server-id must be a range like 5400-5405, and the range must be larger than the parallelism. For MySQL CDC 1.x, only a server-id value but not a range can be specified.
Two methods are available for specifying server-id:
1. Use the WITH parameters in the mysql-cdc DDL statements.
2. Use SQL hints.
SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;

Setting MySQL session timeouts

When an initial consistent snapshot is made for large databases, your established connection could time out while the tables are being read. You can prevent this behavior by configuring interactive_timeout and wait_timeout in your MySQL configuration file.
interactive_timeout indicates the number of seconds the server waits for activity on an interactive connection before closing it. For details, see MySQL documentation.
wait_timeout indicates the number of seconds the server waits for activity on a noninteractive connection before closing it. For details, see MySQL documentation.
During the reading of incremental data, heartbeat may be invalid due to an extremely high load on the TaskManagers, and the server disconnects the connection (EOFException). In this case, you can run the following SQL statements in the MySQL server to increase the timeout:
SET GLOBAL slave_net_timeout = 120;
SET GLOBAL thread_pool_idle_timeout = 120;

Critical logs of the JobManager

For MySQL CDC 2.x, the sync of each table consists of the following stages: chunk splitting, snapshot reading, incremental correction, and incremental reading. Since more resources are occupied for a longer period at the first three stages, Stream Compute Service has made improvements in logs and metrics to help you gain insight into and analyze the running of your jobs.

1. Splitting and assigning chunks

Splitting start: Search by the into chunks keywords, such as Start splitting table cdc_basic_source.random_source_1 into chunks, or Start lazily splitting table cdc_basic_source.random_source_1 into chunks.
Splitting end: Search by the chunks, time cost keywords, such as Split table cdc_basic_source.random_source_1 into 14 chunks, time cost: 994ms.

2. Snapshot reading

Assignment of chunks from snapshot: Enable the log of the DEBUG level, and search by the Current assigned splits for keywords to view the total number of chunks of each table and their assignment.
Chunk assignment end: Search by the finished. Total split number keywords, such as Split assignment for cdc_basic_source.random_source_1 finished. Total split number: 14.
Snapshot reading end: Search for the Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED log.

3. Incremental correction

Incremental correction start: Search for the Initial assigning finished as there are no more splits. Creating binlog split or Newly added assigning finished as there are no more splits. Waking up binlog reader log.

4. Incremental reading

Refer to the following section, "Critical logs of the TaskManagers".

Critical logs of the TaskManagers

Incremental reading start: Search by the has entered pure binlog phase keywords, such as Table cdc_basic_source.random_source_2 has entered pure binlog phase.
Table schema change: Search by the Received schema change event keywords.
EOFException: If a job restarts and the prompted exception is EOFException, adjust the timeout in the MySQL server as prompted. Alternatively, you can set a smaller total parallelism and increase the spec of each TaskManager to reduce the probability of timeout due to high memory and CPU pressure.

Monitoring metrics

Stream Compute Service adds many practical metrics in the MySQL CDC connector. You can go to the Flink UI of a job as instructed in Flink UI, and click the MySQL CDC source operator in the execution graph to search for and view these metrics.
logpos: Get the current binlog position, which can help identify consumption blocks and other issues.
numberOfInsertRecords: Get the number of output +I messages.
numberOfDeleteRecords: Get the number of output -D messages.
numberOfUpdateBeforeRecords: Get the number of output -U messages.
numberOfUpdateAfterRecords: Get the number of output +U messages.
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support