tencent cloud

All product documents
Stream Compute Service
PostgreSQL CDC
Last updated: 2023-11-08 14:51:44
PostgreSQL CDC
Last updated: 2023-11-08 14:51:44

Overview

The Postgres CDC source connector allows for reading snapshot data and incremental data from PostgreSQL databases. It can read the data with exactly-once processing even failures occur.

Versions

Flink Version
Description
1.11
Supported
1.13
Supported
1.14
Unsupported
1.16
Supported

Limits

The Postgres CDC connector can be used only as a source. It supports PostgreSQL v9.6 or later.

Defining a table in DDL

CREATE TABLE postgres_cdc_source_table (
id INT,
name STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to be synced has it defined.
) WITH (
'connector' = 'postgres-cdc', -- Here, it should be'postgres-cdc'.
'hostname' = 'yourHostname', -- IP of the database server.
'port' = '5432', -- Integer port number of the database server.
'username' = 'yourUserName', Name of the PostgreSQL database to use when connecting to the PostgreSQL database server (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT permissions are required).
'password' = 'psw' -- Password to use when connecting to the PostgreSQL database server.
'database-name' = 'yourDatabaseName', -- Database name of the PostgreSQL server to monitor.
'schema-name' = 'yourSchemaName', -- Schema name of the PostgreSQL database to monitor (a regex is supported).
'table-name' = 'yourTableName', -- Table name of the PostgreSQL database to monitor (a regex is supported).
'debezium.slot.name' = 'customslotname' -- Define a unique slot name, which supports lowercase letters, digits, and underscores.
);

WITH parameters

Option
Description
Required
Remarks
connector
The connector to use.
Yes
Here, it should bepostgres-cdc.
hostname
IP address or hostname of the ‌PostgreSQL database server.
Yes
-
username
Name of the PostgreSQL database to use when connecting to the PostgreSQL database server.
Yes
The user must have specific permissions (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT).
password
Password to use when connecting to the PostgreSQL database server.
Yes
-
database-name
Database name of the PostgreSQL server to monitor.
Yes
-
schema-name
Schema name of the PostgreSQL database to monitor.
Yes
The schema-name supports regular expressions to monitor multiple schemas matching the regular expression.
table-name
Table name of the PostgreSQL database to monitor.
Yes
The table-name supports regular expressions to monitor multiple tables matching the regular expression.
port
Integer port number of the PostgreSQL database server.
No
Default value: 5432.
decoding.plugin.name
The name of the Postgres logical decoding plugin.
No
It depends on the Postgres logical decoding plugin installed on the server. Supported values are as follows:
decoderbufs (default)
wal2json
wal2json_rds
wal2json_streaming
wal2json_rds_streaming
pgoutput
debezium.*
Debezium properties.
No
Specifies Debezium properties for fine-grained control of the behaviors on the client, such as 'debezium.slot.name' = 'xxxx', to prevent PSQLException: ERROR: replication slot "dl_test" is active for PID 19997. For details, see Connector configuration properties.

Data type mapping

Postgres CDC and Flink data types are mapped as follows:
Postgres CDC Type
Flink Type
SMALLINT
SMALLINT
INT2
SMALLSERIAL
SERIAL2
INTEGER
INT
SERIAL
BIGINT
BIGINT
BIGSERIAL
REAL
FLOAT
FLOAT4
FLOAT8
DOUBLE
DOUBLE PRECISION
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
BOOLEAN
DATE
DATE
TIME [(p)] [WITHOUT TIMEZONE]
TIME [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
STRING
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
BYTEA
BYTES

Example

CREATE TABLE postgres_cdc_source_table (
id INT,
name STRING,
PRIMARY KEY (`id`) NOT ENFORCED -- Define the primary key here if the database table to be synced has it defined.
) WITH (
'connector' = 'postgres-cdc', -- Here, it should be'postgres-cdc'.
'hostname' = 'yourHostname', -- IP of the database server.
'port' = '5432', -- Integer port number of the database server.
'username' = 'yourUserName', Name of the PostgreSQL database to use when connecting to the PostgreSQL database server (REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT permissions are required).
'password' = 'psw' -- Password to use when connecting to the PostgreSQL database server.
'database-name' = 'yourDatabaseName', -- Database name of the PostgreSQL server to monitor.
'schema-name' = 'yourSchemaName', -- Schema name of the PostgreSQL database to monitor (a regex is supported).
'table-name' = 'yourTableName', -- Table name of the PostgreSQL database to monitor (a regex is supported).
'debezium.slot.name' = 'customslotname' -- Define a unique slot name, which supports lowercase letters, digits, and underscores.
);

CREATE TABLE `print_table` (
`id` INT,
`name` STRING
) WITH (
'connector' = 'print'
);
insert into print_table select * from postgres_cdc_source_table;

Notes

User permissions

The user must at least have the following permissions: REPLICATION, LOGIN, SCHEMA, DATABASE, and SELECT.
CREATE ROLE debezium_user REPLICATION LOGIN;
GRANT USAGE ON SCHEMA schema_name TO debezium_user;
GRANT USAGE ON DATABASE schema_name TO debezium_user;
GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon