Flink Version | Description |
1.11 | Supported |
1.13 | Supported |
1.14 | Unsupported |
1.16 | Supported |
CREATE TABLE postgres_cdc_source_table (id INT,name STRING,PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to be synced has it defined.) WITH ('connector' = 'postgres-cdc', -- Here, it should be'postgres-cdc'.'hostname' = 'yourHostname', -- IP of the database server.'port' = '5432', -- Integer port number of the database server.'username' = 'yourUserName', Name of the PostgreSQL database to use when connecting to the PostgreSQL database server (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT permissions are required).'password' = 'psw' -- Password to use when connecting to the PostgreSQL database server.'database-name' = 'yourDatabaseName', -- Database name of the PostgreSQL server to monitor.'schema-name' = 'yourSchemaName', -- Schema name of the PostgreSQL database to monitor (a regex is supported).'table-name' = 'yourTableName', -- Table name of the PostgreSQL database to monitor (a regex is supported).'debezium.slot.name' = 'customslotname' -- Define a unique slot name, which supports lowercase letters, digits, and underscores.);
Option | Description | Required | Remarks |
connector | The connector to use. | Yes | Here, it should be postgres-cdc . |
hostname | IP address or hostname of the PostgreSQL database server. | Yes | - |
username | Name of the PostgreSQL database to use when connecting to the PostgreSQL database server. | Yes | The user must have specific permissions (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT). |
password | Password to use when connecting to the PostgreSQL database server. | Yes | - |
database-name | Database name of the PostgreSQL server to monitor. | Yes | - |
schema-name | Schema name of the PostgreSQL database to monitor. | Yes | The schema-name supports regular expressions to monitor multiple schemas matching the regular expression. |
table-name | Table name of the PostgreSQL database to monitor. | Yes | The table-name supports regular expressions to monitor multiple tables matching the regular expression. |
port | Integer port number of the PostgreSQL database server. | No | Default value: 5432 . |
decoding.plugin.name | The name of the Postgres logical decoding plugin. | No | It depends on the Postgres logical decoding plugin installed on the server. Supported values are as follows: decoderbufs (default) wal2json wal2json_rds wal2json_streaming wal2json_rds_streaming pgoutput |
debezium.* | Debezium properties. | No | Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.slot.name' = 'xxxx' , to prevent PSQLException: ERROR: replication slot "dl_test" is active for PID 19997 . For details, see Connector configuration properties. |
Postgres CDC Type | Flink Type |
SMALLINT | SMALLINT |
| INT2 |
| SMALLSERIAL |
| SERIAL2 |
INTEGER | INT |
| SERIAL |
BIGINT | BIGINT |
| BIGSERIAL |
REAL | FLOAT |
| FLOAT4 |
FLOAT8 | DOUBLE |
| DOUBLE PRECISION |
NUMERIC(p, s) | DECIMAL(p, s) |
| DECIMAL(p, s) |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
| CHARACTER(n) |
| VARCHAR(n) |
| CHARACTER VARYING(n) |
| TEXT |
BYTEA | BYTES |
CREATE TABLE postgres_cdc_source_table (id INT,name STRING,PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to be synced has it defined.) WITH ('connector' = 'postgres-cdc', -- Here, it should be'postgres-cdc'.'hostname' = 'yourHostname', -- IP of the database server.'port' = '5432', -- Integer port number of the database server.'username' = 'yourUserName', Name of the PostgreSQL database to use when connecting to the PostgreSQL database server (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT permissions are required).'password' = 'psw' -- Password to use when connecting to the PostgreSQL database server.'database-name' = 'yourDatabaseName', -- Database name of the PostgreSQL server to monitor.'schema-name' = 'yourSchemaName', -- Schema name of the PostgreSQL database to monitor (a regex is supported).'table-name' = 'yourTableName', -- Table name of the PostgreSQL database to monitor (a regex is supported).'debezium.slot.name' = 'customslotname' -- Define a unique slot name, which supports lowercase letters, digits, and underscores.);CREATE TABLE `print_table` (`id` INT,`name` STRING) WITH ('connector' = 'print');insert into print_table select * from postgres_cdc_source_table;
CREATE ROLE debezium_user REPLICATION LOGIN;GRANT USAGE ON SCHEMA schema_name TO debezium_user;GRANT USAGE ON DATABASE schema_name TO debezium_user;GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
Was this page helpful?