Flink Version | Description |
1.11 | MySQL v5.6 supported |
1.13 | MySQL v5.6, v5.7, and v8.x supported Configured by default. The source must contain a primary key. If the source has no primary key, you need to set 'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters. |
1.14 | MySQL v5.6, v5.7, and v8.x supported Configured by default. The source must contain a primary key. If the source has no primary key, you need to set 'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters. |
1.16 | MySQL v5.6, v5.7, and v8.x supported Configured by default. The source must contain a primary key. If the source has no primary key, you need to set
'scan.incremental.snapshot.enabled' to 'false' in the WITH parameters. |
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.) WITH ('connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.'hostname' = '192.168.10.22', -- IP of the MySQL database server.'port' = '3306', -- Integer port number of the MySQL database server.'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.);
Option | Description | Required | Remarks |
connector | The connector to use. | Yes | Here, it should be 'mysql-cdc'. |
hostname | IP address or hostname of the MySQL database server. | Yes | - |
port | Integer port number of the MySQL database server. | No | Default value: 3306 |
username | Name of the MySQL database to use when connecting to the MySQL database server. | Yes | A MySQL user with required permissions (including SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT). |
password | Password to use when connecting to the MySQL database server. | Yes | - |
database-name | Database name of the MySQL server to monitor. | Yes | The table-name also supports regular expressions to monitor multiple tables matching the regular expression. |
table-name | Table name of the MySQL database to monitor. | Yes | The table-name also supports regular expressions to monitor multiple tables matching the regular expression. |
server-id | A numeric ID of this database client. | No | It must be unique across all currently-running database processes in the MySQL cluster. We recommend setting a unique ID range for each job in a database, such as 5400-5405 . By default, a random number equal to 6400 - Integer.MAX_VALUE is generated. |
server-time-zone | Session time zone in the database server. | No | An example is "Asia/Shanghai". It controls how the TIMESTAMP type in MySQL is converted to STRING. |
append-mode | Whether to enable the append mode. | No | This mode is available to Flink v1.13 or later. It allows, for example, syncing the MySQL CDC data to Hive. |
filter-duplicate-pair-records | Filters source field change records that are not defined in the Flink DDL statements. | No | For example, a MySQL source contains four fields a, b, c, and d, but only a and b are defined in the table creation SQL statements. If this option is used, change records involving only field c or d will be ignored and not be delivered to the downstream system, reducing the data to be manipulated. |
scan.lastchunk.optimize.enable | Repartitions the last chunk of the snapshot phase. | No | Numerous writes and changes continuously made to the source during the snapshot phase may cause the last chunk to be too large, resulting in the crash and restart of TaskManagers due to OOM.
If this option is enabled (set to true ), Flink automatically divides the last chunk that is too large into small ones to make the job more stable. |
debezium.min.row.count.to.stream.results | When the number of records in the table is greater than this value, the records will be read in batches. | No | It defaults to 1000. Flink reads data from a source with one of the following methods: Full read: The data in the whole table is read to the memory. This method allows quick read of the data, but the memory of the corresponding size will be consumed. If the source is extremely large, this poses the risk of OOM. Batch read: Certain rows of data is read each time until all the data is read. This method will not pose the risk of OOM when the source is large, but the reading is slow. |
debezium.snapshot.fetch.size | Specifies the maximum number of rows that should be read from a MySQL source per time during the snapshot phase. | No | This option applies only for the batch read mode. |
debezium.skipped.operations | Specifies the operations to be skipped, separated by comma. Operations include c (insert), u (update), and d (delete). By default, no operations will be skipped. | No | - |
scan.incremental.snapshot.enabled | Specifies the incremental snapshot. | No | Default value: true . |
scan.incremental.snapshot.chunk.size | The chunk size (number of rows) of table snapshot. Captured tables are split into multiple chunks when the table snapshot is read. | No | Default value: 8096 . |
scan.lazy-calculate-splits.enabled | Whether to enable lazy loading for the JobManager during the snapshot phase to avoid JobManager OOM because of too large data and too many chunks. | No | Default value: true . |
scan.newly-added-table.enabled | Whether to enable dynamic loading. | No | Default value: false . |
scan.split-key.mode | Specifies the primary key as the partitioning key. | No | Two valid values are available: default and specific . default indicates that the first field of the composite key is used as the partitioning key; specific requires setting scan.split-key.specific-column to specify a field in the composite key as the partitioning key. |
scan.split-key.specific-column | Specifies a field in the composite key as the partitioning key. | No | This option is required when the value of scan.split-key.mode is specific . Its value will be the name of a field in the composite key. |
connect.timeout | The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. | No | Default value: 30s . |
connect.max-retries | Max times that the connector should retry to build a MySQL database server connection. | No | Default value: 3 . |
connection.pool.size | Connection pool size. | No | Default value: 20 . |
jdbc.properties.* | Option to pass custom JDBC URL parameters, such as 'jdbc.properties.useSSL' = 'false' . | No | Default value: 20 . |
heartbeat.interval | Interval of sending heartbeat event for tracing the latest available binlog offsets, generally used to address slow update of tables. | No | Default value: 20 . |
debezium.* | Debezium properties | No | Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.snapshot.mode' = 'never' . For details, see Debezium's MySQL Connector properties. |
Key | Data Type | Description |
database_name/meta.database_name | STRING NOT NULL | Name of the table that contains the row. |
table_name/meta.table_name | STRING NOT NULL | Name of the database that contains the row. |
op_ts/meta.op_ts | TIMESTAMP_LTZ(3) NOT NULL | When the change was made in the database. |
meta.batch_id | BIGINT | The batch ID of the binlog. |
meta.is_ddl | BOOLEAN | Whether the metadata consists of DDL statements. |
meta.mysql_type | MAP | Table structure. |
meta.update_before | ARRAY | Field value before it was modified. |
meta.pk_names | ARRAY | Name of the primary key field. |
meta.sql | STRING | Null |
meta.sql_type | MAP | Maps the fields in sql_type to the Java data type ID . |
meta.ts | TIMESTAMP_LTZ(3) NOT NULL | When the row was received and processed. |
meta.op_type | STRING | Operation type, such as INSERT or DELETE . |
meta.file | STRING | Null for the snapshot phase, and the name of the binlog file to which the data belongs for the incremental phase, such as mysql-bin.000101 . |
meta.pos | BIGINT | 0 for the snapshot phase, and the offset of the binlog file to which the data belongs for the incremental phase, such as 143127802 . |
meta.gtid | STRING | Null for the snapshot phase, and the gtid of the data for the incremental read, such as 3d3c4464-c320-11e9-8b3a-6c92bf62891a:66486240 . |
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,`database_name` string METADATA FROM 'database_name',`table_name` string METADATA FROM 'table_name',`op_ts` timestamp(3) METADATA FROM 'op_ts',`op_type` string METADATA FROM 'meta.op_type',`batch_id` bigint METADATA FROM 'meta.batch_id',`is_ddl` boolean METADATA FROM 'meta.is_ddl',`update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',`mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',`pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',`sql` STRING METADATA FROM 'meta.sql',`sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',`ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.) WITH ('connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.'hostname' = '192.168.10.22', -- IP of the MySQL database server.'port' = '3306', -- Integer port number of the MySQL database server.'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.);
table-name
option to a regex to match and read multiple tables. For example, you can set table-name
to **A_.***
to monitor all tables prefixed with A_. database-name
can also implement this feature.database-name
or table-name
is set to a regex, the regex needs to be placed in ()
.MySQL Type | Flink SQL Type | Note |
TINYINT | TINYINT | - |
SMALLINT
TINYINT UNSIGNED
TINYINT UNSIGNED ZEROFILL | SMALLINT | - |
INT MEDIUMINT SMALLINT UNSIGNED SMALLINT UNSIGNED ZEROFILL | INT | - |
BIGINT INT UNSIGNED INT UNSIGNED ZEROFILL MEDIUMINT UNSIGNED MEDIUMINT UNSIGNED ZEROFILL | BIGINT | - |
BIGINT UNSIGNED BIGINT UNSIGNED ZEROFILL SERIAL | DECIMAL(20, 0) | - |
FLOAT FLOAT UNSIGNED FLOAT UNSIGNED ZEROFILL | FLOAT | - |
REAL REAL UNSIGNED REAL UNSIGNED ZEROFILL DOUBLE DOUBLE UNSIGNED DOUBLE UNSIGNED ZEROFILL DOUBLE PRECISION DOUBLE PRECISION UNSIGNED DOUBLE PRECISION UNSIGNED ZEROFILL | DOUBLE | - |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where p <= 38 | DECIMAL(p, s) | - |
NUMERIC(p, s) NUMERIC(p, s) UNSIGNED NUMERIC(p, s) UNSIGNED ZEROFILL DECIMAL(p, s) DECIMAL(p, s) UNSIGNED DECIMAL(p, s) UNSIGNED ZEROFILL FIXED(p, s) FIXED(p, s) UNSIGNED FIXED(p, s) UNSIGNED ZEROFILL where 38 < p <= 65 | STRING | The precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss. |
BOOLEAN TINYINT(1) BIT(1) | BOOLEAN | - |
DATE | DATE | - |
TIME [(p)] | TIME [(p)] | - |
TIMESTAMP [(p)] DATETIME [(p)] | TIMESTAMP [(p)] | - |
CHAR(n) | CHAR(n) | - |
VARCHAR(n) | VARCHAR(n) | - |
BIT(n) | BINARY(⌈n/8⌉) | - |
BINARY(n) | BINARY(n) | - |
VARBINARY(N) | VARBINARY(N) | - |
TINYTEXT TEXT MEDIUMTEXT LONGTEXT | STRING | - |
TINYBLOB BLOB MEDIUMBLOB LONGBLOB | BYTES | Currently, for BLOB data type in MySQL, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported. |
YEAR | INT | - |
ENUM | STRING | - |
JSON | STRING | The JSON data type will be converted into STRING with JSON format in Flink. |
SET | ARRAY<STRING> | As the SET data type in MySQL is a string object that can have zero or more values, it should always be mapped to an array of string. |
GEOMETRY POINT LINESTRING POLYGON MULTIPOINT MULTILINESTRING MULTIPOLYGON GEOMETRYCOLLECTION | STRING | The spatial data types in MySQL will be converted into STRING with a fixed JSON format. |
CREATE TABLE `mysql_cdc_source_table` (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.) WITH ('connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.'hostname' = '192.168.10.22', -- IP of the MySQL database server.'port' = '3306', -- Integer port number of the MySQL database server.'username' = 'debezium', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).'password' = 'hello@world!', -- Password to use when connecting to the MySQL database server.'database-name' = 'YourDatabase', -- Database name of the MySQL server to monitor.'table-name' = 'YourTable' -- Table name of the MySQL database to monitor.);CREATE TABLE `print_table` (`id` INT,`name` STRING) WITH ('connector' = 'print');insert into print_table select * from mysql_cdc_source_table;
execution.checkpointing.tolerable-failed-checkpoints: 100
) as instructed in Advanced Job Parameters.'scan.incremental.snapshot.enabled' = 'false'
, which poses the following risks:FLUSH TABLES WITH READ LOCK
(FTWRL) statement will be executed by default.GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';FLUSH PRIVILEGES;
'debezium.snapshot.locking.mode'
to 'none'
to skip this step.index1
, index2
, index3
, and index4
as indices of the composite key. They must be defined in the same way in PRIMARY KEY
, and their order will not affect the sync.CREATE TABLE db_order_dim (`index1` STRING,`index2` STRING,`index3` STRING,`index4` STRING,`field5` STRING,`field6` STRING,PRIMARY KEY(`index1`, `index2`, `index3`, `index4`) NOT enforced) WITH (...);
server-id
, because the Stream Compute Service system automatically generates a random server-id
in the range of 6400 - 2147483647
to avoid a server-id
conflict that may occur when several jobs read the same database.server-id
in MySQL CDC 2.x, we recommend you set it to a range, such as 5400-5405
. Since each parallel reader must have a unique server ID, server-id
must be a range like 5400-5405
, and the range must be larger than the parallelism. For MySQL CDC 1.x, only a server-id
value but not a range can be specified.server-id
:mysql-cdc
DDL statements.SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ;
interactive_timeout
and wait_timeout
in your MySQL configuration file.interactive_timeout
indicates the number of seconds the server waits for activity on an interactive connection before closing it. For details, see MySQL documentation.wait_timeout
indicates the number of seconds the server waits for activity on a noninteractive connection before closing it. For details, see MySQL documentation.SET GLOBAL slave_net_timeout = 120;SET GLOBAL thread_pool_idle_timeout = 120;
into chunks
keywords, such as Start splitting table cdc_basic_source.random_source_1 into chunks
, or Start lazily splitting table cdc_basic_source.random_source_1 into chunks
.chunks, time cost
keywords, such as Split table cdc_basic_source.random_source_1 into 14 chunks, time cost: 994ms
.DEBUG
level, and search by the Current assigned splits for
keywords to view the total number of chunks of each table and their assignment.finished. Total split number
keywords, such as Split assignment for cdc_basic_source.random_source_1 finished. Total split number: 14
.Assigner status changes from INITIAL_ASSIGNING to INITIAL_ASSIGNING_FINISHED
log.Initial assigning finished as there are no more splits. Creating binlog split
or Newly added assigning finished as there are no more splits. Waking up binlog reader
log.has entered pure binlog phase
keywords, such as Table cdc_basic_source.random_source_2 has entered pure binlog phase.
Received schema change event
keywords.EOFException
, adjust the timeout in the MySQL server as prompted. Alternatively, you can set a smaller total parallelism and increase the spec of each TaskManager to reduce the probability of timeout due to high memory and CPU pressure.
Was this page helpful?