tencent cloud

All product documents
TDMQ for CKafka
MySQL Data Subscription
Last updated: 2024-09-09 21:39:35
MySQL Data Subscription
Last updated: 2024-09-09 21:39:35

Overview

MySQL uses a binary log (binlog) to sequentially record all operations submitted to the database, including modifications to table structures and data within the tables. MySQL uses the binlog for backup or data recovery.
The Debezium MySQL connector generates row-level database change events by reading binlog, including INSERT, UPDATE and DELETE, and sends these events to corresponding Kafka topics. Client applications can process database change events by consuming messages from the corresponding topics to monitor specific databases.
Supported SQL operations for subscription:
Operation Type
Supported SQL Operations
DML
INSERT, UPDATE, and DELETE
DDL
CREATE DATABASE, DROP DATABASE, CREATE TABLE, ALTER TABLE, DROP TABLE, and RENAME TABLE

This document is organized and summarized based on the official Debezium documentation. For details, see Debezium connector for MySQL.

Event Format

The Debezium MySQL connector generates data modifications events for each insert, update, and delete operation. Each event is submitted as a message to the Kafka topic. Each message in the topic contains a key and a value. An example is shown below:

Each Kafka message's key and value contain two fields: schema and payload. The format is as follows:
{
"schema": {
...
},
"payload": {
...
}
}
Key field description:
Item
Field Name
Description
1
schema
The schema field describes the structure of the payload field of the key, i.e., it describes the structure of the primary key of the modified table. If the table does not have a primary key, it describes the structure of its unique key.
2
payload
The structure of the payload field is the same as that described in the first schema and includes the key values of the modified row.

Value field description:
Item
Field Name
Description
1
schema
The schema field describes the structure of the payload field of the value, i.e., it describes the structure of the modified row's fields. This field is usually a nested structure.
2
payload
The structure of the payload field is the same as that described in the second schema and it includes the actual data of the modified row.

Event Message Key

The messages for different types of events all have the same key structure. Below is an example: The key of a change event contains the primary key structure of the modified table and the actual primary key value of the corresponding row.
CREATE TABLE customers (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
first_name VARCHAR(255) NOT NULL,
last_name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL UNIQUE KEY
) AUTO_INCREMENT=1001;
Each event key capturing modifications to the customers table has the same schema. The key of the event message corresponding to this operation is shown as follows (JSON representation):
{
"schema": {
"type": "struct",
"name": "mysql-server-1.inventory.customers.Key",
"optional": false,
"fields": [
{
"field": "id",
"type": "int32",
"optional": false
}
]
},
"payload": {
"id": 1001
}
}
Item
Field Name
Description
1
schema
The schema describes the structure in the payload.
2
mysql-server-1.inventory.customers.Key
The naming format of the schema is connector-name.database-name.table-name.Key. In this example: The mysql-server-1 is the name of the connector generating the event; the inventory is the name of the corresponding database; the customers is the name of the table.
3
optional
It indicates whether the field is optional.
4
fields
It lists all the fields and their structure contained in the payload, including the field name, field type, and whether it is optional.
5
payload
It contains the primary key of the modified row. In the example, it includes only one primary key value with the field name id: 1001.

DML Events

The previous section introduces the key structure of an event message. The key structures for different types of events are the same. This section lists different event types and describes the value structures for each of these event types.

Create Events

The following example shows the value part of the event message generated by the connector when new data are added to the table:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mysql-server-1.inventory.customers.Envelope"
},
"payload": {
"op": "c",
"ts_ms": 1465491411815,
"before": null,
"after": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.9.3.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 0,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": 7,
"query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"
}
}
}
Item
Field Name
Description
1
schema
The schema describes the structure in the payload. The field in the schema is an array that represents multiple fields are contained in the payload. Each element in the array is a description of the respective field structure within the payload.
2
field
Each element in the fields includes a field, which indicates the name of the corresponding field in the payload. In the example, it includes before, after, source, etc.
3
type
It indicates the type of field, such as Integer (int) and String (string).
4
mysql-server-1.inventory.customers.Value
It indicates that this field is part of the value information for the customers table in the inventory database generated by the mysql-server-1 connector.

io.debezium.connector.mysql.Source
This name is bound to a specific connector, and the events generated by the connector all share the same name.
6
payload
It includes the specific modified data in the change event, including the data before (before field) and after (after field) the modification, as well as some metadata information of the connector (source field).
7
op
It indicates the type of modification operation that generates the event. In the example, c indicates an operation that creates a new row. c = create; u = update; d = delete; r = read (only snapshots).
8
source
The source field is a field that describes event metadata. It includes some fields that can be used to compare with other events, such as the order in which events are generated, and whether they belong to the same transaction. This field includes the following metadata information: Debezium version Connector name binlog name where the event was recorded binlog position Row within the event If the event was part of a snapshot Name of the database and table that contain the new row ID of the MySQL thread that created the event (non-snapshot only) MySQL server ID (if available) Timestamp for when the change was made in the database.
9
query
The original SQL statement of the modification operation.

Update Events

The following example shows the value part of the event generated by the update operation:
{
"schema": { ... },
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "1.9.3.Final",
"name": "mysql-server-1",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581029100,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 484,
"row": 0,
"thread": 7,
"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"
},
"op": "u",
"ts_ms": 1465581029523
}
}
The schema field is the same as events in a Create operation, but the payload part is different. In a create event, the before field is null, indicating no original data. In an update event, it includes both the data before and after the update.

Delete Events

The following example shows the value part of the event generated by the delete operation:
{
"schema": { ... },
"payload": {
"before": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null,
"source": {
"version": "1.9.3.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 1465581902300,
"snapshot": false,
"db": "inventory",
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 805,
"row": 0,
"thread": 7,
"query": "DELETE FROM customers WHERE id=1004"
},
"op": "d",
"ts_ms": 1465581902461
}
}
The schema field is the same as events in a Create operation, but the payload part is different. In a Delete event, it includes the data before the update, but the data after the update is null, indicating the data has been deleted.

Primary Key Updates

If an operation modifies the primary key of a row in the table, then the connector will generate a Delete event to represent the deletion of the row associated with the original primary key, and at the same time generate a Create event to represent the insertion of the row associated with the new primary key. The header of each message will be associated with the corresponding key. The official description is as follows:
The DELETE event record has __debezium.newkey as a message header. The value of this header is the new primary key for the updated row.
The CREATE event record has __debezium.oldkey as a message header. The value of this header is the previous (old) primary key that the updated row had.

DDL Events

Create Database

The following example shows the value part of the event generated by the Create Database operation:
{
"source" : {
"server" : "dip_source"
},
"position" : {
"ts_sec" : 1655812326,
"file" : "mysql-bin.000006",
"pos" : 26063,
"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
"snapshot" : true
},
"databaseName" : "dip_test",
"ddl" : "CREATE DATABASE `dip_test` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
"tableChanges" : [ ]
}
The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event.

Drop Database

The following example shows the value part of the event generated by the Delete Database operation:
{
"source" : {
"server" : "dip_source"
},
"position" : {
"ts_sec" : 1655812326,
"file" : "mysql-bin.000006",
"pos" : 26063,
"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
"snapshot" : true
},
"databaseName" : "dip_test",
"ddl" : "DROP DATABASE IF EXISTS `dip_test`",
"tableChanges" : [ ]
}
The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event.

Create Table

The following example shows the value part of the event generated by the create table operation:
{
"source" : {
"server" : "dip_source"
},
"position" : {
"ts_sec" : 1655812326,
"file" : "mysql-bin.000006",
"pos" : 26063,
"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
"snapshot" : true
},
"databaseName" : "dip_test",
"ddl" : "CREATE TABLE `customers` (\n `id` int NOT NULL AUTO_INCREMENT,\n `first_name` varchar(255) NOT NULL,\n `last_name` varchar(255) NOT NULL,\n `email` varchar(255) NOT NULL,\n PRIMARY KEY (`id`),\n UNIQUE KEY `email` (`email`),\n KEY `ix_id` (`id`)\n) ENGINE=InnoDB AUTO_INCREMENT=1041 DEFAULT CHARSET=utf8",
"tableChanges" : [ {
"type" : "CREATE",
"id" : "\"dip_test\".\"customers\"",
"table" : {
"defaultCharsetName" : "utf8",
"primaryKeyColumnNames" : [ "id" ],
"columns" : [ {
"name" : "id",
"jdbcType" : 4,
"typeName" : "INT",
"typeExpression" : "INT",
"charsetName" : null,
"position" : 1,
"optional" : false,
"autoIncremented" : true,
"generated" : true,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
}, {
"name" : "first_name",
"jdbcType" : 12,
"typeName" : "VARCHAR",
"typeExpression" : "VARCHAR",
"charsetName" : "utf8",
"length" : 255,
"position" : 2,
"optional" : false,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
}, {
"name" : "last_name",
"jdbcType" : 12,
"typeName" : "VARCHAR",
"typeExpression" : "VARCHAR",
"charsetName" : "utf8",
"length" : 255,
"position" : 3,
"optional" : false,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
}, {
"name" : "email",
"jdbcType" : 12,
"typeName" : "VARCHAR",
"typeExpression" : "VARCHAR",
"charsetName" : "utf8",
"length" : 255,
"position" : 4,
"optional" : false,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : false,
"enumValues" : [ ]
} ]
},
"comment" : null
} ]
}
The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event. The columns field records the definition information of the different fields of the new table.

Alter Table

The following example shows the value part of the event generated by the alter table operation:
{
"source" : {
"server" : "1307446078-a123"
},
"position" : {
"transaction_id" : null,
"ts_sec" : 1655782153,
"file" : "mysql-bin.000005",
"pos" : 1218,
"gtids" : "ddf040ad-7509-11ec-968b-0c42a1eda2e9:1-8",
"server_id" : 183277
},
"databaseName" : "test",
"ddl" : "ALTER TABLE `user` ADD COLUMN `createtime` datetime NULL DEFAULT CURRENT_TIMESTAMP",
"tableChanges" : [ {
"type" : "ALTER",
"id" : "\"test\".\"user\"",
"table" : {
"defaultCharsetName" : "utf8",
"primaryKeyColumnNames" : [ ],
"columns" : [ {
"name" : "name",
"jdbcType" : 1,
"typeName" : "CHAR",
"typeExpression" : "CHAR",
"charsetName" : "utf8",
"length" : 20,
"position" : 1,
"optional" : true,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : true,
"defaultValueExpression" : "",
"enumValues" : [ ]
}, {
"name" : "age",
"jdbcType" : 4,
"typeName" : "INT",
"typeExpression" : "INT",
"charsetName" : null,
"position" : 2,
"optional" : true,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : true,
"enumValues" : [ ]
}, {
"name" : "createtime",
"jdbcType" : 93,
"typeName" : "DATETIME",
"typeExpression" : "DATETIME",
"charsetName" : null,
"position" : 3,
"optional" : true,
"autoIncremented" : false,
"generated" : false,
"comment" : null,
"hasDefaultValue" : true,
"defaultValueExpression" : "1970-01-01 00:00:00",
"enumValues" : [ ]
} ]
},
"comment" : null
} ]
}
The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event. The columns field records the information of the modified fields.

Drop Table

The following example shows the value part of the event generated by the Drop Table operation:
{
"source" : {
"server" : "dip_source"
},
"position" : {
"ts_sec" : 1655812326,
"file" : "mysql-bin.000006",
"pos" : 26063,
"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78",
"snapshot" : true
},
"databaseName" : "dip_test",
"ddl" : "DROP TABLE IF EXISTS `dip_test`.`customers`",
"tableChanges" : [ ]
}
The content of position records information such as binlog files and consumption offsets. The ddl field contains the SQL statement that triggers the event.

Rename Table

The following example shows the value part of the event generated by the Rename operation:
{
"schema": {
"type": "struct",
"fields": ···,
"optional": false,
"name": "io.debezium.connector.mysql.SchemaChangeValue"
},
"payload": {
"source": {
"version": "1.9.0.Final",
"connector": "mysql",
"name": "task-lzpx4pdo",
"ts_ms": 1656300979748,
"snapshot": "false",
"db": "testDB",
"sequence": null,
"table": "t_test",
"server_id": 170993,
"gtid": "b24176f2-5409-11ec-80d4-b8599fe5c6ea:80",
"file": "mysql-bin.000006",
"pos": 26411,
"row": 0,
"thread": null,
"query": null
},
"databaseName": "testDB",
"schemaName": null,
"ddl": "rename table test to t_test",
"tableChanges": [{
"type": "ALTER",
"id": "\"testDB\".\"t_test\"",
"table": {
"defaultCharsetName": "utf8",
"primaryKeyColumnNames": ["id"],
"columns": [{
"name": "id",
"jdbcType": -5,
"nativeType": null,
"typeName": "BIGINT",
"typeExpression": "BIGINT",
"charsetName": null,
"length": 20,
"scale": null,
"position": 1,
"optional": false,
"autoIncremented": true,
"generated": true,
"comment": null
}, {
"name": "name",
"jdbcType": 12,
"nativeType": null,
"typeName": "VARCHAR",
"typeExpression": "VARCHAR",
"charsetName": "utf8",
"length": 20,
"scale": null,
"position": 2,
"optional": true,
"autoIncremented": false,
"generated": false,
"comment": null
}],
"comment": null
}
}]
}
}
The schema contains the format information of the payload content. Some content is omitted here. In the payload field, source is the metadata information, and the ddl field is the SQL statement that triggers the event. Columns are the fields of the affected table.

Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon