tencent cloud

Feedback

MySQL Data Subscription

Last updated: 2024-09-09 21:39:35

    Overview

    MySQL uses a binary log (binlog) to sequentially record all operations submitted to the database, including modifications to table structures and data within the tables. MySQL uses the binlog for backup or data recovery.
    The Debezium MySQL connector generates row-level database change events by reading binlog, including INSERT, UPDATE and DELETE, and sends these events to corresponding Kafka topics. Client applications can process database change events by consuming messages from the corresponding topics to monitor specific databases.
    Supported SQL operations for subscription:
    Operation Type
    Supported SQL Operations
    DML
    INSERT, UPDATE, and DELETE
    DDL
    CREATE DATABASE, DROP DATABASE, CREATE TABLE, ALTER TABLE, DROP TABLE, and RENAME TABLE
    
    This document is organized and summarized based on the official Debezium documentation. For details, see Debezium connector for MySQL.

    Event Format

    The Debezium MySQL connector generates data modifications events for each insert, update, and delete operation. Each event is submitted as a message to the Kafka topic. Each message in the topic contains a key and a value. An example is shown below:
    
    Each Kafka message's key and value contain two fields: schema and payload. The format is as follows:
    {
    "schema": {
    ...
    },
    "payload": {
    ...
    }
    }
    Key field description:
    Item
    Field Name
    Description
    1
    schema
    The schema field describes the structure of the payload field of the key, i.e., it describes the structure of the primary key of the modified table. If the table does not have a primary key, it describes the structure of its unique key.
    2
    payload
    The structure of the payload field is the same as that described in the first schema and includes the key values of the modified row.
    
    Value field description:
    Item
    Field Name
    Description
    1
    schema
    The schema field describes the structure of the payload field of the value, i.e., it describes the structure of the modified row's fields. This field is usually a nested structure.
    2
    payload
    The structure of the payload field is the same as that described in the second schema and it includes the actual data of the modified row.

    Event Message Key

    The messages for different types of events all have the same key structure. Below is an example: The key of a change event contains the primary key structure of the modified table and the actual primary key value of the corresponding row.
    CREATE TABLE customers (
    id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
    first_name VARCHAR(255) NOT NULL,
    last_name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL UNIQUE KEY
    ) AUTO_INCREMENT=1001;
    Each event key capturing modifications to the customers table has the same schema. The key of the event message corresponding to this operation is shown as follows (JSON representation):
    {
    "schema": {
    "type": "struct",
    "name": "mysql-server-1.inventory.customers.Key",
    "optional": false,
    "fields": [
    {
    "field": "id",
    "type": "int32",
    "optional": false
    }
    ]
    },
    "payload": {
    "id": 1001
    }
    }
    Item
    Field Name
    Description
    1
    schema
    The schema describes the structure in the payload.
    2
    mysql-server-1.inventory.customers.Key
    The naming format of the schema is connector-name.database-name.table-name.Key. In this example: The mysql-server-1 is the name of the connector generating the event; the inventory is the name of the corresponding database; the customers is the name of the table.
    3
    optional
    It indicates whether the field is optional.
    4
    fields
    It lists all the fields and their structure contained in the payload, including the field name, field type, and whether it is optional.
    5
    payload
    It contains the primary key of the modified row. In the example, it includes only one primary key value with the field name id: 1001.

    DML Events

    The previous section introduces the key structure of an event message. The key structures for different types of events are the same. This section lists different event types and describes the value structures for each of these event types.

    Create Events

    The following example shows the value part of the event message generated by the connector when new data are added to the table:
    {
    "schema": {
    "type": "struct",
    "fields": [
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": false,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "mysql-server-1.inventory.customers.Value",
    "field": "before"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": false,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "mysql-server-1.inventory.customers.Value",
    "field": "after"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "version"
    },
    {
    "type": "string",
    "optional": false,
    "field": "connector"
    },
    {
    "type": "string",
    "optional": false,
    "field": "name"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "ts_ms"
    },
    {
    "type": "boolean",
    "optional": true,
    "default": false,
    "field": "snapshot"
    },
    {
    "type": "string",
    "optional": false,
    "field": "db"
    },
    {
    "type": "string",
    "optional": true,
    "field": "table"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "server_id"
    },
    {
    "type": "string",
    "optional": true,
    "field": "gtid"
    },
    {
    "type": "string",
    "optional": false,
    "field": "file"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "pos"
    },
    {
    "type": "int32",
    "optional": false,
    "field": "row"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "thread"
    },
    {
    "type": "string",
    "optional": true,
    "field": "query"
    }
    ],
    "optional": false,
    "name": "io.debezium.connector.mysql.Source",
    "field": "source"
    },
    {
    "type": "string",
    "optional": false,
    "field": "op"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "ts_ms"
    }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope"
    },
    "payload": {
    "op": "c",
    "ts_ms": 1465491411815,
    "before": null,
    "after": {
    "id": 1004,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "source": {
    "version": "1.9.3.Final",
    "connector": "mysql",
    "name": "mysql-server-1",
    "ts_ms": 0,
    "snapshot": false,
    "db": "inventory",
    "table": "customers",
    "server_id": 0,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "thread": 7,
    "query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
    }
    }
    }
    Item
    Field Name
    Description
    1
    schema
    The schema describes the structure in the payload. The field in the schema is an array that represents multiple fields are contained in the payload. Each element in the array is a description of the respective field structure within the payload.
    2
    field
    Each element in the fields includes a field, which indicates the name of the corresponding field in the payload. In the example, it includes before, after, source, etc.
    3
    type
    It indicates the type of field, such as Integer (int) and String (string).
    4
    mysql-server-1.inventory.customers.Value
    It indicates that this field is part of the value information for the customers table in the inventory database generated by the mysql-server-1 connector.
    
    io.debezium.connector.mysql.Source
    This name is bound to a specific connector, and the events generated by the connector all share the same name.
    6
    payload
    It includes the specific modified data in the change event, including the data before (before field) and after (after field) the modification, as well as some metadata information of the connector (source field).
    7
    op
    It indicates the type of modification operation that generates the event. In the example, c indicates an operation that creates a new row. c = create; u = update; d = delete; r = read (only snapshots).
    8
    source
    The source field is a field that describes event metadata. It includes some fields that can be used to compare with other events, such as the order in which events are generated, and whether they belong to the same transaction. This field includes the following metadata information: Debezium version Connector name binlog name where the event was recorded binlog position Row within the event If the event was part of a snapshot Name of the database and table that contain the new row ID of the MySQL thread that created the event (non-snapshot only) MySQL server ID (if available) Timestamp for when the change was made in the database.
    9
    query
    The original SQL statement of the modification operation.

    Update Events

    The following example shows the value part of the event generated by the update operation:
    {
    "schema": { ... },
    "payload": {
    "before": {
    "id": 1004,
    "first_name": "Anne",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "after": {
    "id": 1004,
    "first_name": "Anne Marie",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "source": {
    "version": "1.9.3.Final",
    "name": "mysql-server-1",
    "connector": "mysql",
    "name": "mysql-server-1",
    "ts_ms": 1465581029100,
    "snapshot": false,
    "db": "inventory",
    "table": "customers",
    "server_id": 223344,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 484,
    "row": 0,
    "thread": 7,
    "query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
    },
    "op": "u",
    "ts_ms": 1465581029523
    }
    }
    The schema field is the same as events in a Create operation, but the payload part is different. In a create event, the before field is null, indicating no original data. In an update event, it includes both the data before and after the update.

    Delete Events

    The following example shows the value part of the event generated by the delete operation:
    {
    "schema": { ... },
    "payload": {
    "before": {
    "id": 1004,
    "first_name": "Anne Marie",
    "last_name": "Kretchmar",
    "email": "annek@noanswer.org"
    },
    "after": null,
    "source": {
    "version": "1.9.3.Final",
    "connector": "mysql",
    "name": "mysql-server-1",
    "ts_ms": 1465581902300,
    "snapshot": false,
    "db": "inventory",
    "table": "customers",
    "server_id": 223344,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 805,
    "row": 0,
    "thread": 7,
    "query": "DELETE FROM customers WHERE id=1004"
    },
    "op": "d",
    "ts_ms": 1465581902461
    }
    }
    The schema field is the same as events in a Create operation, but the payload part is different. In a Delete event, it includes the data before the update, but the data after the update is null, indicating the data has been deleted.

    Primary Key Updates

    If an operation modifies the primary key of a row in the table, then the connector will generate a Delete event to represent the deletion of the row associated with the original primary key, and at the same time generate a Create event to represent the insertion of the row associated with the new primary key. The header of each message will be associated with the corresponding key. The official description is as follows:
    The DELETE event record has __debezium.newkey as a message header. The value of this header is the new primary key for the updated row.
    The CREATE event record has __debezium.oldkey as a message header. The value of this header is the previous (old) primary key that the updated row had.

    DDL Events

    Create Database

    The following example shows the value part of the event generated by the Create Database operation:
    {
    "source" : {
    "server" : "dip_source"
    },
    "position" : {
    "ts_sec" : 1655812326,
    "file" : "mysql-bin.000006",
    "pos" : 26063,
    "gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
    "snapshot" : true
    },
    "databaseName" : "dip_test",
    "ddl" : "CREATE DATABASE `dip_test` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
    "tableChanges" : [ ]
    }
    The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event.

    Drop Database

    The following example shows the value part of the event generated by the Delete Database operation:
    {
    "source" : {
    "server" : "dip_source"
    },
    "position" : {
    "ts_sec" : 1655812326,
    "file" : "mysql-bin.000006",
    "pos" : 26063,
    "gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
    "snapshot" : true
    },
    "databaseName" : "dip_test",
    "ddl" : "DROP DATABASE IF EXISTS `dip_test`",
    "tableChanges" : [ ]
    }
    The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event.

    Create Table

    The following example shows the value part of the event generated by the create table operation:
    {
    "source" : {
    "server" : "dip_source"
    },
    "position" : {
    "ts_sec" : 1655812326,
    "file" : "mysql-bin.000006",
    "pos" : 26063,
    "gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
    "snapshot" : true
    },
    "databaseName" : "dip_test",
    "ddl" : "CREATE TABLE `customers` (\\n `id` int NOT NULL AUTO_INCREMENT,\\n `first_name` varchar(255) NOT NULL,\\n `last_name` varchar(255) NOT NULL,\\n `email` varchar(255) NOT NULL,\\n PRIMARY KEY (`id`),\\n UNIQUE KEY `email` (`email`),\\n KEY `ix_id` (`id`)\\n) ENGINE=InnoDB AUTO_INCREMENT=1041 DEFAULT CHARSET=utf8",
    "tableChanges" : [ {
    "type" : "CREATE",
    "id" : "\\"dip_test\\".\\"customers\\"",
    "table" : {
    "defaultCharsetName" : "utf8",
    "primaryKeyColumnNames" : [ "id" ],
    "columns" : [ {
    "name" : "id",
    "jdbcType" : 4,
    "typeName" : "INT",
    "typeExpression" : "INT",
    "charsetName" : null,
    "position" : 1,
    "optional" : false,
    "autoIncremented" : true,
    "generated" : true,
    "comment" : null,
    "hasDefaultValue" : false,
    "enumValues" : [ ]
    }, {
    "name" : "first_name",
    "jdbcType" : 12,
    "typeName" : "VARCHAR",
    "typeExpression" : "VARCHAR",
    "charsetName" : "utf8",
    "length" : 255,
    "position" : 2,
    "optional" : false,
    "autoIncremented" : false,
    "generated" : false,
    "comment" : null,
    "hasDefaultValue" : false,
    "enumValues" : [ ]
    }, {
    "name" : "last_name",
    "jdbcType" : 12,
    "typeName" : "VARCHAR",
    "typeExpression" : "VARCHAR",
    "charsetName" : "utf8",
    "length" : 255,
    "position" : 3,
    "optional" : false,
    "autoIncremented" : false,
    "generated" : false,
    "comment" : null,
    "hasDefaultValue" : false,
    "enumValues" : [ ]
    }, {
    "name" : "email",
    "jdbcType" : 12,
    "typeName" : "VARCHAR",
    "typeExpression" : "VARCHAR",
    "charsetName" : "utf8",
    "length" : 255,
    "position" : 4,
    "optional" : false,
    "autoIncremented" : false,
    "generated" : false,
    "comment" : null,
    "hasDefaultValue" : false,
    "enumValues" : [ ]
    } ]
    },
    "comment" : null
    } ]
    }
    The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event. The columns field records the definition information of the different fields of the new table.

    Alter Table

    The following example shows the value part of the event generated by the alter table operation:
    {
    "source" : {
    "server" : "1307446078-a123"
    },
    "position" : {
    "transaction_id" : null,
    "ts_sec" : 1655782153,
    "file" : "mysql-bin.000005",
    "pos" : 1218,
    "gtids" : "ddf040ad-7509-11ec-968b-0c42a1eda2e9:1-8",
    "server_id" : 183277
    },
    "databaseName" : "test",
    "ddl" : "ALTER TABLE `user` ADD COLUMN `createtime` datetime NULL DEFAULT CURRENT_TIMESTAMP",
    "tableChanges" : [ {
    "type" : "ALTER",
    "id" : "\\"test\\".\\"user\\"",
    "table" : {
    "defaultCharsetName" : "utf8",
    "primaryKeyColumnNames" : [ ],
    "columns" : [ {
    "name" : "name",
    "jdbcType" : 1,
    "typeName" : "CHAR",
    "typeExpression" : "CHAR",
    "charsetName" : "utf8",
    "length" : 20,
    "position" : 1,
    "optional" : true,
    "autoIncremented" : false,
    "generated" : false,
    "comment" : null,
    "hasDefaultValue" : true,
    "defaultValueExpression" : "",
    "enumValues" : [ ]
    }, {
    "name" : "age",
    "jdbcType" : 4,
    "typeName" : "INT",
    "typeExpression" : "INT",
    "charsetName" : null,
    "position" : 2,
    "optional" : true,
    "autoIncremented" : false,
    "generated" : false,
    "comment" : null,
    "hasDefaultValue" : true,
    "enumValues" : [ ]
    }, {
    "name" : "createtime",
    "jdbcType" : 93,
    "typeName" : "DATETIME",
    "typeExpression" : "DATETIME",
    "charsetName" : null,
    "position" : 3,
    "optional" : true,
    "autoIncremented" : false,
    "generated" : false,
    "comment" : null,
    "hasDefaultValue" : true,
    "defaultValueExpression" : "1970-01-01 00:00:00",
    "enumValues" : [ ]
    } ]
    },
    "comment" : null
    } ]
    }
    The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event. The columns field records the information of the modified fields.

    Drop Table

    The following example shows the value part of the event generated by the Drop Table operation:
    {
    "source" : {
    "server" : "dip_source"
    },
    "position" : {
    "ts_sec" : 1655812326,
    "file" : "mysql-bin.000006",
    "pos" : 26063,
    "gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
    "snapshot" : true
    },
    "databaseName" : "dip_test",
    "ddl" : "DROP TABLE IF EXISTS `dip_test`.`customers`",
    "tableChanges" : [ ]
    }
    The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event.

    Rename Table

    The following example shows the value part of the event generated by the Rename operation:
    {
    "schema": {
    "type": "struct",
    "fields": ···,
    "optional": false,
    "name": "io.debezium.connector.mysql.SchemaChangeValue"
    },
    "payload": {
    "source": {
    "version": "1.9.0.Final",
    "connector": "mysql",
    "name": "task-lzpx4pdo",
    "ts_ms": 1656300979748,
    "snapshot": "false",
    "db": "testDB",
    "sequence": null,
    "table": "t_test",
    "server_id": 170993,
    "gtid": "b24176f2-5409-11ec-80d4-b8599fe5c6ea:80",
    "file": "mysql-bin.000006",
    "pos": 26411,
    "row": 0,
    "thread": null,
    "query": null
    },
    "databaseName": "testDB",
    "schemaName": null,
    "ddl": "rename table test to t_test",
    "tableChanges": [{
    "type": "ALTER",
    "id": "\\"testDB\\".\\"t_test\\"",
    "table": {
    "defaultCharsetName": "utf8",
    "primaryKeyColumnNames": ["id"],
    "columns": [{
    "name": "id",
    "jdbcType": -5,
    "nativeType": null,
    "typeName": "BIGINT",
    "typeExpression": "BIGINT",
    "charsetName": null,
    "length": 20,
    "scale": null,
    "position": 1,
    "optional": false,
    "autoIncremented": true,
    "generated": true,
    "comment": null
    }, {
    "name": "name",
    "jdbcType": 12,
    "nativeType": null,
    "typeName": "VARCHAR",
    "typeExpression": "VARCHAR",
    "charsetName": "utf8",
    "length": 20,
    "scale": null,
    "position": 2,
    "optional": true,
    "autoIncremented": false,
    "generated": false,
    "comment": null
    }],
    "comment": null
    }
    }]
    }
    }
    The schema contains the format information of the payload content. Some content is omitted here. In the payload field, source is the metadata information, and the ddl field is the SQL statement that triggers the event. Columns are the fields of the affected table.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support