flink-connector-jdbc
connector component provided by Stream Compute Service has built-in MySQL and PostgreSQL drivers. To connect Oracle and other databases, update the JDBC Driver packages as custom packages.Flink Version | Description |
1.11 | Supported |
1.13 | Supported |
1.14 | Supported |
1.16 | Supported |
CREATE TABLE `jdbc_source_table` (`id` INT,`name` STRING) WITH (-- Specify the options for database connection.'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.'table-name' = 'my-table', -- The name of the table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'MyPa$$w0rd', -- The password for database access.'scan.partition.column' = 'id', -- The name of the partitioned scan column.'scan.partition.num' = '2', -- The number of partitions.'scan.partition.lower-bound' = '0', -- The minimum value of the first partition.'scan.partition.upper-bound' = '100', -- The maximum value of the last partition.'scan.fetch-size' = '1' -- The number of rows that should be fetched from the database when reading per round trip.);
CREATE TABLE `jdbc_dim_table` (`id` INT,`name` STRING) WITH (-- Specify the options for database connection.'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.'table-name' = 'my-table', -- The name of the table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'MyPa$$w0rd', -- The password for database access.'lookup.cache.max-rows' = '100', -- The maximum number of rows of lookup cache.'lookup.cache.ttl' = '5000' -- The max time to live for each row in the Lookup Cache.);
CREATE TABLE `jdbc_sink_table` (`id` INT,`name` STRING) WITH (-- Specify the options for database connection.'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.'table-name' = 'my-table', -- The name of JDBC table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'MyPa$$w0rd', -- The password for database access.'sink.buffer-flush.max-rows' = '200', -- The maximum size of buffered records before flush.'sink.buffer-flush.interval' = '2s' -- The flush interval.);
CREATE TABLE `jdbc_upsert_sink_table` (`id` INT PRIMARY KEY NOT ENFORCED,`name` STRING) WITH (-- Specify the options for database connection.'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.'table-name' = 'my-upsert-table', -- The name of JDBC table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'MyPa$$w0rd', -- The password for database access.'sink.buffer-flush.max-rows' = '200', -- The maximum size of buffered records before flush.'sink.buffer-flush.interval' = '2s' -- The flush interval.);
Option | Required | Default Value | Description |
connector | Yes | None | For connection to a database, it should be 'jdbc' . |
url | Yes | None | The JDBC database URL. |
table-name | Yes | None | The table name. |
driver | No | None | The class name of the JDBC driver to use to connect to the above-mentioned URL. If no value is set, it will automatically be derived from the URL. |
name-server | No | None | The JDBC username. 'username' and 'password' must both be specified if any of them is specified. |
password | No | None | The JDBC password. |
scan.partition.column | No | None | The column name used for partitioning the input. The column type must be numeric, date, or timestamp. For details, see Partitioned scan. |
scan.partition.num | No | None | The number of partitions. |
scan.partition.lower-bound | No | None | The smallest value of the first partition. |
scan.partition.upper-bound | No | None | The largest value of the last partition. |
scan.fetch-size | No | 0 | The number of rows that should be fetched from the database when reading per round trip. If the value specified is 0, the data is read row by row, indicating low efficiency (low throughput). |
lookup.cache.max-rows | No | None | The maximum number of data records cached in the Lookup Cache. |
lookup.cache.ttl | No | None | The maximum cache time of each data record in the Lookup Cache. |
lookup.max-retries | No | 3 | The maximum number of retries upon database lookup failure. |
sink.buffer-flush.max-rows | No | 100 | The maximum size of buffered records before flush. If this is set to 0 , the records will not be buffered. |
sink.buffer-flush.interval | No | 1s | The maximum interval (ms) between flushes. If sink.buffer-flush.max-rows is 0 , and this option is not, buffered actions will be processed asynchronously. |
sink.max-retries | No | 3 | The maximum retry times if writing records to database failed. |
sink.ignore-delete | No | false | Whether to ignore the DELETE operation. |
CREATE TABLE `jdbc_source_table` (`id` INT,`name` STRING) WITH (-- Specify the options for database connection.'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.'table-name' = 'my-table', -- The name of the table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'MyPa$$w0rd', -- The password for database access.'scan.partition.column' = 'id', -- The name of the partitioned scan column.'scan.partition.num' = '2', -- The number of partitions.'scan.partition.lower-bound' = '0', -- The minimum value of the first partition.'scan.partition.upper-bound' = '100', -- The maximum value of the last partition.'scan.fetch-size' = '1' -- The number of rows that should be fetched from the database when reading per round trip.);CREATE TABLE `jdbc_upsert_sink_table` (`id` INT PRIMARY KEY NOT ENFORCED,`name` STRING) WITH (-- Specify the options for database connection.'connector' = 'jdbc','url' = 'jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- Replace it with your MySQL database URL.'table-name' = 'my-upsert-table', -- The name of JDBC table to connect.'username' = 'admin', -- The username (with the INSERT permission required) for database access.'password' = 'MyPa$$w0rd', -- The password for database access.'sink.buffer-flush.max-rows' = '200', -- The maximum size of buffered records before flush.'sink.buffer-flush.interval' = '2s' -- The flush interval.);insert into jdbc_upsert_sink_table select * from jdbc_source_table;
PRIMARY KEY NOT ENFORCED
constraint shall also be set for the corresponding column in the CREATE TABLE clause of the DDL statements.INSERT .. ON DUPLICATE KEY UPDATE ..
syntax, which is supported by all common MySQL versions.INSERT .. ON CONFLICT .. DO UPDATE SET ..
syntax, which is supported by PostgreSQL v9.5 or later.MERGE .. INTO .. USING ON .. WHEN UPDATE .. WHEN INSERT ..
syntax, which is supported by Oracle v9i or later.scan.partition
must all be specified. Otherwise, errors will be raised.scan.partition.upper-bound
and the minimum value specified by scan.partition.lower-bound
refer to the maximum and minimum partition steps. They do not affect the number and accuracy of the read data records.lookup.cache.max-rows
and lookup.cache.ttl
.rewriteBatchedStatements=true
following the URL option as shown below.jdbc:mysql://10.1.28.93:3306/CDB?rewriteBatchedStatements=true
reWriteBatchedInserts=true
following the URL option as shown below.jdbc:postgresql://10.1.28.93:3306/PG?reWriteBatchedInserts=true¤tSchema=Database schema
Was this page helpful?