tencent cloud

Feedback

PostgreSQL Data Subscription

Last updated: 2024-09-09 21:40:39

    Overview

    The Debezium PostgreSQL connector can capture row-level modification operations in the PostgreSQL database and generate corresponding change events. When the Debezium PostgreSQL connector connects to the PostgreSQL server for the first time, it generates a snapshot of all databases. Subsequently, it continuously captures row-level modification operations, including insert, update, and delete, and generates data change events, which are then submitted as messages to the corresponding Kafka topics. Client applications can consume these topic messages to process database change events, enabling monitoring of specific databases.
    This document is organized and summarized based on the official Debezium documentation. For details, see Debezium connector for PostgreSQL.

    Event Format

    The Debezium PostgreSQL connector generates data change events for each row-level insert, update, or delete operation. Each event is submitted as a message to the Kafka Topic. Each message in the Topic contains key and value parts. Here is an example:
    
    Each Kafka message's key and value contain two fields: schema and payload. The format is as follows:
    {
    "schema": {
    ...
    },
    "payload": {
    ...
    }
    }
    Key field description:
    Item
    Field Name
    Description
    1
    schema
    The schema field describes the structure of the key's payload field, which describes the primary key structure of the modified table. If the table has no primary key, it describes the structure of its unique key.
    2
    payload
    The structure of the payload field is the same as that descried in the schema, and includes the key values of the modified row.
    Value field description:
    Item
    Field Name
    Description
    1
    schema
    The schema field describes the structure of the payload field of the value, i.e., it describes the structure of the modified row's fields. This field is usually a nested structure.
    2
    payload
    The structure of the payload field is the same as that described in the schema, and it contains the actual data of the modified row.

    Event Message Key

    The messages for different types of events all have the same key structure. Below is an example: The key of a change event contains the primary key structure of the modified table and the actual primary key value of the corresponding row.
    CREATE TABLE customers (
    id SERIAL,
    first_name VARCHAR(255) NOT NULL,
    last_name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL,
    PRIMARY KEY(id)
    );
    The key of the event message corresponding to this operation is shown as follows (JSON representation):
    {
    "schema": {
    "type": "struct",
    "name": "PostgreSQL_server.public.customers.Key",
    "optional": false,
    "fields": [
    {
    "name": "id",
    "index": "0",
    "schema": {
    "type": "INT32",
    "optional": "false"
    }
    }
    ]
    },
    "payload": {
    "id": "1"
    },
    }
    Item
    Field Name
    Description
    1
    schema
    The schema describes the structure in the payload.
    2
    PostgreSQL_server.inventory.customers.Key
    The schema field name follows the format connector-name.database-name.table-name.Key. In this example: PostgreSQL_server is the name of the connector generating the event; the inventory is the name of the database; the customers is the name of the table.
    3
    optional
    It indicates whether the field is optional.
    4
    fields
    It lists all the fields and their structure contained in the payload, including the field name, field type, and whether it is optional.
    5
    payload
    It contains the primary key of the modified row. In the example, it includes only one primary key value with the field name id: 1.

    Event List

    The previous section introduces the structure of an event message key. The key structures for different types of events are the same. This section lists different event types and describes the value structures for each of these event types.

    Create Events

    Below is a message generated by the Debezium PostgreSQL connector for a database Insert operation:
    {
    "schema": {
    "type": "struct",
    "fields": [
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": false,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "PostgreSQL_server.inventory.customers.Value",
    "field": "before"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": false,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "PostgreSQL_server.inventory.customers.Value",
    "field": "after"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "version"
    },
    {
    "type": "string",
    "optional": false,
    "field": "connector"
    },
    {
    "type": "string",
    "optional": false,
    "field": "name"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "ts_ms"
    },
    {
    "type": "boolean",
    "optional": true,
    "default": false,
    "field": "snapshot"
    },
    {
    "type": "string",
    "optional": false,
    "field": "db"
    },
    {
    "type": "string",
    "optional": false,
    "field": "schema"
    },
    {
    "type": "string",
    "optional": false,
    "field": "table"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "txId"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "lsn"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "xmin"
    }
    ],
    "optional": false,
    "name": "io.debezium.connector.postgresql.Source",
    "field": "source"
    },
    {
    "type": "string",
    "optional": false,
    "field": "op"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "ts_ms"
    }
    ],
    "optional": false,
    "name": "PostgreSQL_server.inventory.customers.Envelope"
    },
    "payload": {
    "before": null,
    "after": {
    "id": 1,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "source": {
    "version": "1.9.3.Final",
    "connector": "postgresql",
    "name": "PostgreSQL_server",
    "ts_ms": 1559033904863,
    "snapshot": true,
    "db": "postgres",
    "sequence": "[\\"24023119\\",\\"24023128\\"]"
    "schema": "public",
    "table": "customers",
    "txId": 555,
    "lsn": 24023128,
    "xmin": null
    },
    "op": "c",
    "ts_ms": 1559033904863
    }
    }
    Item
    Field Name
    Description
    1
    schema
    The schema describes the structure in the payload. The field in the schema is an array that represents multiple fields are contained in the payload. Each element in the array is a description of the respective field structure within the payload.
    2
    field
    Each element in the fields includes a field, which indicates the name of the corresponding field in the payload. In the example, it includes before, after, source, etc.
    3
    type
    It indicates the type of field, such as Integer (int) and String (string).
    4
    PostgreSQL_server.inventory.customers.Value
    It indicates that this field is part of the value information of the customers table in the inventory database generated by the PostgreSQL_server connector.
    5
    io.debezium.connector.postgresql.Source
    This name is bound to a specific connector, and the events generated by the connector all share the same name.
    6
    payload
    It includes the specific modified data in the change event, including the data before (before field) and after (after field) the modification, as well as some metadata information of the connector (source field).
    7
    op
    It indicates the type of modification operation that generates the event. In the example, c indicates an operation that creates a new row. c = create; u = update; d = delete; r = read (only snapshots); t = truncate; m = message
    8
    source
    The source field is a field that describes event metadata. It contains several fields that can be used to compare with other events, such as the order in which events are generated and whether they belong to the same transaction. The source field includes the following metadata: Debezium version; Connector type and name; Database and table that contains the new row; Stringified JSON array of additional offset information (the first value is always the last committed LSN, the second value is the current LSN; either value may be null); Schema name; Whether the event was part of a snapshot; Transaction ID in which the operation was performed; Offset of the operation in the database log; Timestamp for when the change was made in the database.

    Update Events

    Below is a message generated by the Debezium PostgreSQL connector for a database update operation:
    {
    "schema": { ... },
    "payload": {
    "before": {
    "id": 1
    },
    "after": {
    "id": 1,
    "first_name": "Anne Marie",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "source": {
    "version": "1.9.3.Final",
    "connector": "postgresql",
    "name": "PostgreSQL_server",
    "ts_ms": 1559033904863,
    "snapshot": false,
    "db": "postgres",
    "schema": "public",
    "table": "customers",
    "txId": 556,
    "lsn": 24023128,
    "xmin": null
    },
    "op": "u",
    "ts_ms": 1465584025523
    }
    }
    The schema field is the same as that in the create event, but the payload part differs. In the create event, the before field is null, indicating there is no original data. In contrast, in the update event, both the before and after fields are present, showing the data before and after the update.

    Truncate Events

    When a truncate table event occurs, the key of the event message is null. An example of such a message is shown below:
    {
    "schema": { ... },
    "payload": {
    "source": {
    "version": "1.9.3.Final",
    "connector": "postgresql",
    "name": "PostgreSQL_server",
    "ts_ms": 1559033904863,
    "snapshot": false,
    "db": "postgres",
    "schema": "public",
    "table": "customers",
    "txId": 556,
    "lsn": 46523128,
    "xmin": null
    },
    "op": "t",
    "ts_ms": 1559033904961
    }
    }
    If a TRUNCATE statement affects multiple tables, the connector will generate a separate truncate event message for each affected table.

    Message Events

    This message type only supports the Postgres 14+ pgoutput plugin. An example of a transaction message event format is shown below:
    {
    "schema": { ... },
    "payload": {
    "source": {
    "version": "1.9.3.Final",
    "connector": "postgresql",
    "name": "PostgreSQL_server",
    "ts_ms": 1559033904863,
    "snapshot": false,
    "db": "postgres",
    "schema": "",
    "table": "",
    "txId": 556,
    "lsn": 46523128,
    "xmin": null
    },
    "op": "m",
    "ts_ms": 1559033904961,
    "message": {
    "prefix": "foo",
    "content": "Ymfy"
    }
    }
    }
    An example of a non-transaction message format is shown below:
    {
    "schema": { ... },
    "payload": {
    "source": {
    "version": "1.9.3.Final",
    "connector": "postgresql",
    "name": "PostgreSQL_server",
    "ts_ms": 1559033904863,
    "snapshot": false,
    "db": "postgres",
    "schema": "",
    "table": "",
    "lsn": 46523128,
    "xmin": null
    },
    "op": "m",
    "ts_ms": 1559033904961
    "message": {
    "prefix": "foo",
    "content": "Ymfy"
    }
    }
    The transaction message event includes a txId field representing the transaction ID. Additionally, the message event contains a message field, which is explained as follows:
    Field Name
    Description
    message
    This field contains the metadata of the message:
    prefix(text)
    Content (byte array that is encoded based on the binary handling mode setting)

    Delete Events

    Below is a message generated by the Debezium PostgreSQL connector for a database delete operation:
    {
    "schema": { ... },
    "payload": {
    "before": {
    "id": 1
    },
    "after": null,
    "source": {
    "version": "1.9.3.Final",
    "connector": "postgresql",
    "name": "PostgreSQL_server",
    "ts_ms": 1559033904863,
    "snapshot": false,
    "db": "postgres",
    "schema": "public",
    "table": "customers",
    "txId": 556,
    "lsn": 46523128,
    "xmin": null
    },
    "op": "d",
    "ts_ms": 1465581902461
    }
    }
    The schema field is the same as that in the create event, but the payload part is different. In a Delete event, it includes the data before the modification, but the data after the update is null, indicating the data has been deleted.

    Primary Key Events

    If an operation modifies the primary key of a row in the data table, then the connector will generate a delete event to indicate that the row with the original primary key has been deleted, and a create event is generated to represent the insertion of the row with the new primary key. The header of each message will be associated with the corresponding key. The official description is as follows:
    The DELETE event record has __debezium.newkey as a message header. The value of this header is the new primary key for the updated row.
    The CREATE event record has __debezium.oldkey as a message header. The value of this header is the previous (old) primary key that the updated row had.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support