Flink Version | Description |
1.11 | Unsupported |
1.13 | Supported |
1.14 | Unsupported |
1.16 | Supported |
CREATE TABLE `jdbc_sink_table` (`id` INT,`name` STRING) WITH (-- Specify the options for database connection.'connector' = 'jdbcPG','url' = 'jdbc:postgresql://10.1.28.93:3306/CDB?reWriteBatchedInserts=true&serverTimezone=Asia/Shanghai', -- Replace it with your PostreSQL database URL.'table-name' = 'my-table', -- The name of JDBC table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'MyPa$$w0rd', -- The password for database access.'sink.buffer-flush.max-rows' = '200', -- The max size of buffered records before flush.'sink.buffer-flush.interval' = '2s' -- The flush interval.);
CREATE TABLE `jdbc_upsert_sink_table` (`id` INT PRIMARY KEY NOT ENFORCED,`name` STRING) WITH (-- Specify the options for database connection.'connector' = 'jdbcPG','url' = 'jdbc:postgresql://10.1.28.93:3306/CDB?reWriteBatchedInserts=true&serverTimezone=Asia/Shanghai', -- Replace it with your PostreSQL connection URL.'table-name' = 'my-upsert-table', -- The name of JDBC table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'MyPa$$w0rd', -- The password for database access.'sink.buffer-flush.max-rows' = '200', -- The max size of buffered records before flush.'sink.buffer-flush.interval' = '2s' -- The flush interval.);
Option | Required | Default Value | Description |
connector | Yes | None | For connection to a database, it should be 'jdbcPG' . |
url | Yes | None | The JDBC database URL. |
table-name | Yes | None | The name of JDBC table to connect. |
driver | No | None | The class name of the JDBC driver to use to connect to the above-mentioned URL. If no value is set, it will automatically be derived from the URL. |
username | No | None | The JDBC username. 'username' and 'password' must both be specified if any of them is specified. |
password | No | None | The JDBC password. |
scan.partition.column | No | None | The column name used for partitioning the input. The column type must be numeric, date, or timestamp. For details, see Partitioned scan. |
scan.partition.num | No | None | The number of partitions. |
scan.partition.lower-bound | No | None | The smallest value of the first partition. |
scan.partition.upper-bound | No | None | The largest value of the last partition. |
scan.fetch-size | No | 0 | The number of rows that should be fetched from the database when reading per round trip. If the value specified is 0, the data is read row by row, indicating low efficiency (low throughput). |
lookup.cache.max-rows | No | None | The maximum number of data records cached in the Lookup Cache. |
lookup.cache.ttl | No | None | The maximum cache time of each data record in the Lookup Cache. |
lookup.max-retries | No | 3 | The maximum number of retries upon database lookup failure. |
sink.buffer-flush.max-rows | No | 100 | The maximum size of buffered records before flush. If this is set to 0 , the records will not be buffered. |
sink.buffer-flush.interval | No | 1s | The maximum interval (ms) between flushes. If sink.buffer-flush.max-rows is 0 , and this parameter is not, buffered actions will be processed asynchronously. |
sink.max-retries | No | 3 | The maximum retry times if writing records to database failed. |
write-mode | No | insert | Write in the copy mode. Valid values: insert and copy . |
copy-delimiter | No | Ι | The field delimiter in the copy mode. |
CREATE TABLE jdbc_upsert_sink_table (id INT ,age INT,name STRING,PRIMARY KEY (id) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.) WITH (-- Specify the options for database connection.'connector' = 'jdbcPG','url' = 'jdbc:postgresql://10.0.0.2:5436/postgres','table-name' = 'my-upsert-table', -- The name of JDBC table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'password', -- The password for database access.'sink.buffer-flush.max-rows' = '300', -- The max size of buffered records before flush.'sink.buffer-flush.interval' = '100s' -- The flush interval.);-- MySQL CDC connector as the source. Use "flink-connector-mysql-cdc" and this connector together. The MySQL version must be v5.7 or later.For more information, see: https://www.tencentcloud.com/document/product/849/52698?from_cn_redirect=1.CREATE TABLE mysql_cdc_source_table (id INT,age INT,name STRING,PRIMARY KEY (id) NOT ENFORCED -- Define the primary key here if the database table to monitor has it defined.) WITH ('connector' = 'mysql-cdc', -- Here, it should be 'mysql-cdc'.'hostname' = '10.0.0.6', -- IP of the MySQL database server.'port' = '3360', -- Integer port number of the MySQL database server.'username' = 'root', -- Name of the database to use when connecting to the MySQL database server (SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, and RELOAD permissions are required).'password' = 'password', -- Password to use when connecting to the MySQL database server.-- 'scan.incremental.snapshot.enabled' = 'false' -- This option is required if the source table has no primary key.'database-name' = 'database', -- Database name of the MySQL server to monitor.'table-name' = 'table' -- Table name of the MySQL database to monitor.);INSERT INTO jdbc_upsert_sink_table select * from mysql_cdc_source_table;
PRIMARY KEY NOT ENFORCED
constraint shall also be set for the corresponding column in the CREATE TABLE clause of the DDL statements.INSERT .. ON CONFLICT .. DO UPDATE SET ..
syntax, but on upsert +delete
in batches. This syntax applies to PostgreSQL v9.5 or earlier.reWriteBatchedInserts=true
following the URL option as shown below.jdbc:postgresql://10.1.28.93:3306/PG?reWriteBatchedInserts=true¤tSchema=Database schema
Was this page helpful?