操作类型 | 支持的SQL操作 |
DML | INSERT、UPDATE、DELETE |
DDL | CREATE DATABASE、DROP DATABASE、CREATE TABLE、ALTER TABLE、DROP TABLE、RENAME TABLE |
{"schema": {...},"payload": {...}}
Item | Field name | Description |
1 | schema | schema 字段描述了 key 的 payload 字段的结构,即它描述了被修改的表的主键(primary key)结构,如果表没有主键,则描述其唯一约束(unique key)的结构。 |
2 | payload | payload 字段的结构和第一个 schema 中描述的相同,包含了被修改的行的键值。 |
Item | Field name | Description |
1 | schema | schema 字段描述了 value 的payload 字段的结构,即描述了被修改行的字段结构。这个字段通常是一个嵌套结构的字段。 |
2 | payload | payload 字段的结构和第二个 schema 中定义的相同,它包含被修改行的真实数据。 |
CREATE TABLE customers (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,first_name VARCHAR(255) NOT NULL,last_name VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL UNIQUE KEY) AUTO_INCREMENT=1001;
{"schema": {"type": "struct","name": "mysql-server-1.inventory.customers.Key","optional": false,"fields": [{"field": "id","type": "int32","optional": false}]},"payload": {"id": 1001}}
Item | Field name | Description |
1 | schema | Schema 描述了 payload 中的结构。 |
2 | mysql-server-1.inventory.customers.Key | schema 的名称格式为 *connector-name*.*database-name*.*table-name*.Key。在这个例子中: mysql-server-1 是生成事件的connector的名字。inventory 是对应数据库的名字。 customers 是表的名字。 |
3 | optional | 表示字段是否是可选项。 |
4 | fields | 列出了所有 payload 中包含的字段及其结构, 包括字段名、字段类型、以及是否可选。 |
5 | payload | 包含被修改行的主键。在例子中仅包含一个字段名为 id 的主键值: 1001。 |
{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory.customers.Value","field": "before"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory.customers.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "boolean","optional": true,"default": false,"field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "table"},{"type": "int64","optional": false,"field": "server_id"},{"type": "string","optional": true,"field": "gtid"},{"type": "string","optional": false,"field": "file"},{"type": "int64","optional": false,"field": "pos"},{"type": "int32","optional": false,"field": "row"},{"type": "int64","optional": true,"field": "thread"},{"type": "string","optional": true,"field": "query"}],"optional": false,"name": "io.debezium.connector.mysql.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"}],"optional": false,"name": "mysql-server-1.inventory.customers.Envelope"},"payload": {"op": "c","ts_ms": 1465491411815,"before": null,"after": {"id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"},"source": {"version": "1.9.3.Final","connector": "mysql","name": "mysql-server-1","ts_ms": 0,"snapshot": false,"db": "inventory","table": "customers","server_id": 0,"gtid": null,"file": "mysql-bin.000003","pos": 154,"row": 0,"thread": 7,"query": "INSERT INTO customers (first_name, last_name, email) VALUES ('Anne', 'Kretchmar', 'annek@noanswer.org')"}}}
Item | Field name | Description |
1 | schema | Schema 描述了 payload 中的结构, 其中 schema 中的 fields 字段为一个数组,表示 payload 字段包含了多个字段,数组的每个元素是对 payload 中相应字段结构的描述信息。 |
2 | field | 每一个 fields 中的元素都包含一个 field 字段,该字段表示 payload 中对应字段的名称。在示例中包括 before、after、source等。 |
3 | type | 表示字段的类型,如整型(int)、字符串(string)等. |
4 | mysql-server-1.inventory.customers.Value | 表示该字段是 mysql-server-1 连接器生成的针对 inventory 数据库的 customers 表的 value 部分信息。 |
| io.debezium.connector.mysql.Source | 该名称和特定的 connector 绑定,由该 connector 生成的事件该名称都相同。 |
6 | payload | 包含修改事件中具体被修改的数据,包括修改前(before 字段)和修改后(after 字段)的数据,以及一些 connector 的元数据信息(source 字段)。 |
7 | op | 表示导致事件生成的修改操作的类型,例子中的 c 表示 修改操作创建了一个新的行。c = createu = updated = deleter = read (仅 snapshots) |
8 | source | source 字段是一个描述事件元数据的字段。它包含的一些字段可以用来与其他事件做比较,如比较事件生成的顺序、事件是否属于同一个事务等。该字段包含以下元数据信息: Debezium versionConnector namebinlog name where the event was recordedbinlog positionRow within the eventIf the event was part of a snapshotName of the database and table that contain the new rowID of the MySQL thread that created the event (non-snapshot only)MySQL server ID (if available)Timestamp for when the change was made in the database |
9 | query | 修改操作的原始 SQL 语句。 |
{"schema": { ... },"payload": {"before": {"id": 1004,"first_name": "Anne","last_name": "Kretchmar","email": "annek@noanswer.org"},"after": {"id": 1004,"first_name": "Anne Marie","last_name": "Kretchmar","email": "annek@noanswer.org"},"source": {"version": "1.9.3.Final","name": "mysql-server-1","connector": "mysql","name": "mysql-server-1","ts_ms": 1465581029100,"snapshot": false,"db": "inventory","table": "customers","server_id": 223344,"gtid": null,"file": "mysql-bin.000003","pos": 484,"row": 0,"thread": 7,"query": "UPDATE customers SET first_name='Anne Marie' WHERE id=1004"},"op": "u","ts_ms": 1465581029523}}
{"schema": { ... },"payload": {"before": {"id": 1004,"first_name": "Anne Marie","last_name": "Kretchmar","email": "annek@noanswer.org"},"after": null,"source": {"version": "1.9.3.Final","connector": "mysql","name": "mysql-server-1","ts_ms": 1465581902300,"snapshot": false,"db": "inventory","table": "customers","server_id": 223344,"gtid": null,"file": "mysql-bin.000003","pos": 805,"row": 0,"thread": 7,"query": "DELETE FROM customers WHERE id=1004"},"op": "d","ts_ms": 1465581902461}}
DELETE
event record has __debezium.newkey
as a message header. The value of this header is the new primary key for the updated row.CREATE
event record has __debezium.oldkey
as a message header. The value of this header is the previous (old) primary key that the updated row had.{"source" : {"server" : "dip_source"},"position" : {"ts_sec" : 1655812326,"file" : "mysql-bin.000006","pos" : 26063,"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78","snapshot" : true},"databaseName" : "dip_test","ddl" : "CREATE DATABASE `dip_test` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci","tableChanges" : [ ]}
{"source" : {"server" : "dip_source"},"position" : {"ts_sec" : 1655812326,"file" : "mysql-bin.000006","pos" : 26063,"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78","snapshot" : true},"databaseName" : "dip_test","ddl" : "DROP DATABASE IF EXISTS `dip_test`","tableChanges" : [ ]}
{"source" : {"server" : "dip_source"},"position" : {"ts_sec" : 1655812326,"file" : "mysql-bin.000006","pos" : 26063,"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78","snapshot" : true},"databaseName" : "dip_test","ddl" : "CREATE TABLE `customers` (\\n `id` int NOT NULL AUTO_INCREMENT,\\n `first_name` varchar(255) NOT NULL,\\n `last_name` varchar(255) NOT NULL,\\n `email` varchar(255) NOT NULL,\\n PRIMARY KEY (`id`),\\n UNIQUE KEY `email` (`email`),\\n KEY `ix_id` (`id`)\\n) ENGINE=InnoDB AUTO_INCREMENT=1041 DEFAULT CHARSET=utf8","tableChanges" : [ {"type" : "CREATE","id" : "\\"dip_test\\".\\"customers\\"","table" : {"defaultCharsetName" : "utf8","primaryKeyColumnNames" : [ "id" ],"columns" : [ {"name" : "id","jdbcType" : 4,"typeName" : "INT","typeExpression" : "INT","charsetName" : null,"position" : 1,"optional" : false,"autoIncremented" : true,"generated" : true,"comment" : null,"hasDefaultValue" : false,"enumValues" : [ ]}, {"name" : "first_name","jdbcType" : 12,"typeName" : "VARCHAR","typeExpression" : "VARCHAR","charsetName" : "utf8","length" : 255,"position" : 2,"optional" : false,"autoIncremented" : false,"generated" : false,"comment" : null,"hasDefaultValue" : false,"enumValues" : [ ]}, {"name" : "last_name","jdbcType" : 12,"typeName" : "VARCHAR","typeExpression" : "VARCHAR","charsetName" : "utf8","length" : 255,"position" : 3,"optional" : false,"autoIncremented" : false,"generated" : false,"comment" : null,"hasDefaultValue" : false,"enumValues" : [ ]}, {"name" : "email","jdbcType" : 12,"typeName" : "VARCHAR","typeExpression" : "VARCHAR","charsetName" : "utf8","length" : 255,"position" : 4,"optional" : false,"autoIncremented" : false,"generated" : false,"comment" : null,"hasDefaultValue" : false,"enumValues" : [ ]} ]},"comment" : null} ]}
{"source" : {"server" : "1307446078-a123"},"position" : {"transaction_id" : null,"ts_sec" : 1655782153,"file" : "mysql-bin.000005","pos" : 1218,"gtids" : "ddf040ad-7509-11ec-968b-0c42a1eda2e9:1-8","server_id" : 183277},"databaseName" : "test","ddl" : "ALTER TABLE `user` ADD COLUMN `createtime` datetime NULL DEFAULT CURRENT_TIMESTAMP","tableChanges" : [ {"type" : "ALTER","id" : "\\"test\\".\\"user\\"","table" : {"defaultCharsetName" : "utf8","primaryKeyColumnNames" : [ ],"columns" : [ {"name" : "name","jdbcType" : 1,"typeName" : "CHAR","typeExpression" : "CHAR","charsetName" : "utf8","length" : 20,"position" : 1,"optional" : true,"autoIncremented" : false,"generated" : false,"comment" : null,"hasDefaultValue" : true,"defaultValueExpression" : "","enumValues" : [ ]}, {"name" : "age","jdbcType" : 4,"typeName" : "INT","typeExpression" : "INT","charsetName" : null,"position" : 2,"optional" : true,"autoIncremented" : false,"generated" : false,"comment" : null,"hasDefaultValue" : true,"enumValues" : [ ]}, {"name" : "createtime","jdbcType" : 93,"typeName" : "DATETIME","typeExpression" : "DATETIME","charsetName" : null,"position" : 3,"optional" : true,"autoIncremented" : false,"generated" : false,"comment" : null,"hasDefaultValue" : true,"defaultValueExpression" : "1970-01-01 00:00:00","enumValues" : [ ]} ]},"comment" : null} ]}
{"source" : {"server" : "dip_source"},"position" : {"ts_sec" : 1655812326,"file" : "mysql-bin.000006","pos" : 26063,"gtids" : "b24176f2-5409-11ec-80d4-b8599fe5c6ea:1-78","snapshot" : true},"databaseName" : "dip_test","ddl" : "DROP TABLE IF EXISTS `dip_test`.`customers`","tableChanges" : [ ]}
{"schema": {"type": "struct","fields": ···,"optional": false,"name": "io.debezium.connector.mysql.SchemaChangeValue"},"payload": {"source": {"version": "1.9.0.Final","connector": "mysql","name": "task-lzpx4pdo","ts_ms": 1656300979748,"snapshot": "false","db": "testDB","sequence": null,"table": "t_test","server_id": 170993,"gtid": "b24176f2-5409-11ec-80d4-b8599fe5c6ea:80","file": "mysql-bin.000006","pos": 26411,"row": 0,"thread": null,"query": null},"databaseName": "testDB","schemaName": null,"ddl": "rename table test to t_test","tableChanges": [{"type": "ALTER","id": "\\"testDB\\".\\"t_test\\"","table": {"defaultCharsetName": "utf8","primaryKeyColumnNames": ["id"],"columns": [{"name": "id","jdbcType": -5,"nativeType": null,"typeName": "BIGINT","typeExpression": "BIGINT","charsetName": null,"length": 20,"scale": null,"position": 1,"optional": false,"autoIncremented": true,"generated": true,"comment": null}, {"name": "name","jdbcType": 12,"nativeType": null,"typeName": "VARCHAR","typeExpression": "VARCHAR","charsetName": "utf8","length": 20,"scale": null,"position": 2,"optional": true,"autoIncremented": false,"generated": false,"comment": null}],"comment": null}}]}}
本页内容是否解决了您的问题?