tencent cloud

All product documents
TDMQ for CKafka
PostgreSQL Data Subscription
Last updated: 2024-09-09 21:40:39
PostgreSQL Data Subscription
Last updated: 2024-09-09 21:40:39

Overview

The Debezium PostgreSQL connector can capture row-level modification operations in the PostgreSQL database and generate corresponding change events. When the Debezium PostgreSQL connector connects to the PostgreSQL server for the first time, it generates a snapshot of all databases. Subsequently, it continuously captures row-level modification operations, including insert, update, and delete, and generates data change events, which are then submitted as messages to the corresponding Kafka topics. Client applications can consume these topic messages to process database change events, enabling monitoring of specific databases.
This document is organized and summarized based on the official Debezium documentation. For details, see Debezium connector for PostgreSQL.

Event Format

The Debezium PostgreSQL connector generates data change events for each row-level insert, update, or delete operation. Each event is submitted as a message to the Kafka Topic. Each message in the Topic contains key and value parts. Here is an example:

Each Kafka message's key and value contain two fields: schema and payload. The format is as follows:
{
"schema": {
...
},
"payload": {
...
}
}
Key field description:
Item
Field Name
Description
1
schema
The schema field describes the structure of the key's payload field, which describes the primary key structure of the modified table. If the table has no primary key, it describes the structure of its unique key.
2
payload
The structure of the payload field is the same as that descried in the schema, and includes the key values of the modified row.
Value field description:
Item
Field Name
Description
1
schema
The schema field describes the structure of the payload field of the value, i.e., it describes the structure of the modified row's fields. This field is usually a nested structure.
2
payload
The structure of the payload field is the same as that described in the schema, and it contains the actual data of the modified row.

Event Message Key

The messages for different types of events all have the same key structure. Below is an example: The key of a change event contains the primary key structure of the modified table and the actual primary key value of the corresponding row.
CREATE TABLE customers (
id SERIAL,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
PRIMARY KEY(id)
);
The key of the event message corresponding to this operation is shown as follows (JSON representation):
{
"schema": {
"type": "struct",
"name": "PostgreSQL_server.public.customers.Key",
"optional": false,
"fields": [
{
"name": "id",
"index": "0",
"schema": {
"type": "INT32",
"optional": "false"
}
}
]
},
"payload": {
"id": "1"
},
}
Item
Field Name
Description
1
schema
The schema describes the structure in the payload.
2
PostgreSQL_server.inventory.customers.Key
The schema field name follows the format connector-name.database-name.table-name.Key. In this example: PostgreSQL_server is the name of the connector generating the event; the inventory is the name of the database; the customers is the name of the table.
3
optional
It indicates whether the field is optional.
4
fields
It lists all the fields and their structure contained in the payload, including the field name, field type, and whether it is optional.
5
payload
It contains the primary key of the modified row. In the example, it includes only one primary key value with the field name id: 1.

Event List

The previous section introduces the structure of an event message key. The key structures for different types of events are the same. This section lists different event types and describes the value structures for each of these event types.

Create Events

Below is a message generated by the Debezium PostgreSQL connector for a database Insert operation:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "PostgreSQL_server.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "PostgreSQL_server.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "PostgreSQL_server.inventory.customers.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.9.3.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": true,
"db": "postgres",
"sequence": "[\"24023119\",\"24023128\"]"
"schema": "public",
"table": "customers",
"txId": 555,
"lsn": 24023128,
"xmin": null
},
"op": "c",
"ts_ms": 1559033904863
}
}
Item
Field Name
Description
1
schema
The schema describes the structure in the payload. The field in the schema is an array that represents multiple fields are contained in the payload. Each element in the array is a description of the respective field structure within the payload.
2
field
Each element in the fields includes a field, which indicates the name of the corresponding field in the payload. In the example, it includes before, after, source, etc.
3
type
It indicates the type of field, such as Integer (int) and String (string).
4
PostgreSQL_server.inventory.customers.Value
It indicates that this field is part of the value information of the customers table in the inventory database generated by the PostgreSQL_server connector.
5
io.debezium.connector.postgresql.Source
This name is bound to a specific connector, and the events generated by the connector all share the same name.
6
payload
It includes the specific modified data in the change event, including the data before (before field) and after (after field) the modification, as well as some metadata information of the connector (source field).
7
op
It indicates the type of modification operation that generates the event. In the example, c indicates an operation that creates a new row. c = create; u = update; d = delete; r = read (only snapshots); t = truncate; m = message
8
source
The source field is a field that describes event metadata. It contains several fields that can be used to compare with other events, such as the order in which events are generated and whether they belong to the same transaction. The source field includes the following metadata: Debezium version; Connector type and name; Database and table that contains the new row; Stringified JSON array of additional offset information (the first value is always the last committed LSN, the second value is the current LSN; either value may be null); Schema name; Whether the event was part of a snapshot; Transaction ID in which the operation was performed; Offset of the operation in the database log; Timestamp for when the change was made in the database.

Update Events

Below is a message generated by the Debezium PostgreSQL connector for a database update operation:
{
"schema": { ... },
"payload": {
"before": {
"id": 1
},
"after": {
"id": 1,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.9.3.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": false,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 556,
"lsn": 24023128,
"xmin": null
},
"op": "u",
"ts_ms": 1465584025523
}
}
The schema field is the same as that in the create event, but the payload part differs. In the create event, the before field is null, indicating there is no original data. In contrast, in the update event, both the before and after fields are present, showing the data before and after the update.

Truncate Events

When a truncate table event occurs, the key of the event message is null. An example of such a message is shown below:
{
"schema": { ... },
"payload": {
"source": {
"version": "1.9.3.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": false,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 556,
"lsn": 46523128,
"xmin": null
},
"op": "t",
"ts_ms": 1559033904961
}
}
If a TRUNCATE statement affects multiple tables, the connector will generate a separate truncate event message for each affected table.

Message Events

This message type only supports the Postgres 14+ pgoutput plugin. An example of a transaction message event format is shown below:
{
"schema": { ... },
"payload": {
"source": {
"version": "1.9.3.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": false,
"db": "postgres",
"schema": "",
"table": "",
"txId": 556,
"lsn": 46523128,
"xmin": null
},
"op": "m",
"ts_ms": 1559033904961,
"message": {
"prefix": "foo",
"content": "Ymfy"
}
}
}
An example of a non-transaction message format is shown below:
{
"schema": { ... },
"payload": {
"source": {
"version": "1.9.3.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": false,
"db": "postgres",
"schema": "",
"table": "",
"lsn": 46523128,
"xmin": null
},
"op": "m",
"ts_ms": 1559033904961
"message": {
"prefix": "foo",
"content": "Ymfy"
}
}
The transaction message event includes a txId field representing the transaction ID. Additionally, the message event contains a message field, which is explained as follows:
Field Name
Description
message
This field contains the metadata of the message:
prefix(text)
Content (byte array that is encoded based on the binary handling mode setting)

Delete Events

Below is a message generated by the Debezium PostgreSQL connector for a database delete operation:
{
"schema": { ... },
"payload": {
"before": {
"id": 1
},
"after": null,
"source": {
"version": "1.9.3.Final",
"connector": "postgresql",
"name": "PostgreSQL_server",
"ts_ms": 1559033904863,
"snapshot": false,
"db": "postgres",
"schema": "public",
"table": "customers",
"txId": 556,
"lsn": 46523128,
"xmin": null
},
"op": "d",
"ts_ms": 1465581902461
}
}
The schema field is the same as that in the create event, but the payload part is different. In a Delete event, it includes the data before the modification, but the data after the update is null, indicating the data has been deleted.

Primary Key Events

If an operation modifies the primary key of a row in the data table, then the connector will generate a delete event to indicate that the row with the original primary key has been deleted, and a create event is generated to represent the insertion of the row with the new primary key. The header of each message will be associated with the corresponding key. The official description is as follows:
The DELETE event record has __debezium.newkey as a message header. The value of this header is the new primary key for the updated row.
The CREATE event record has __debezium.oldkey as a message header. The value of this header is the previous (old) primary key that the updated row had.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon