consumerDemo-avro-flink\\src\\main\\resources\\avro-tools-1.8.2.jar
: The tool used to generate Avro protocol code.consumerDemo-avro-flink\\src\\main\\java\\com\\tencent\\subscribe\\avro
: The directory of the Avro tool-generated code.consumerDemo-avro-flink\\src\\main\\resources\\Record.avsc
: The protocol definition file.Record.avsc
. The main data structure is record
, which is used to represent a data record in binlog. The record structure is as follows. Other data structures can be viewed in Record.avsc
.{"namespace": "com.tencent.subscribe.avro", // The last schema in `Record.avsc`, with `name` displayed as `Record`."type": "record","name": "Record", // `name` is displayed as `Record`, indicating the format of the data consumed from Kafka."fields": [{"name": "id", // `id` indicates a globally incremental ID. More record values are explained as follows:"type": "long","doc": "unique id of this record in the whole stream"},{"name": "version", // `version` indicates the protocol version."type": "int","doc": "protocol version"},{"name": "messageType", // Message type"aliases": ["operation"],"type": {"namespace": "com.tencent.subscribe.avro","name": "MessageType","type": "enum","symbols": ["INSERT","UPDATE","DELETE","DDL","BEGIN","COMMIT","HEARTBEAT","CHECKPOINT","ROLLBACK"]}},{......},}
Field Name in Record | Description |
id | The globally incremental ID |
version | The protocol version, which is v1 currently. |
messageType | The message type. Enumerated values: INSERT , UPDATE , DELETE , DDL , BEGIN , COMMIT , HEARTBEAT , CHECKPOINT . |
fileName | The name of the binlog file where the current record is located |
position | The end offset of the current record in the binlog in the format of End_log_pos@binlog file number . For example, if the current record is in file mysql-bin.000004 and the end offset is 2196, then the value of this parameter will be 2196@4 . |
safePosition | The start offset of the current transaction in the binlog, which is in the same format as described above. |
timestamp | The time when the data was written to the binlog, which is a UNIX timestamp in seconds. |
gtid | The current GTID, such as c7c98333-6006-11ed-bfc9-b8cef6e1a231:9. |
transactionId | The transaction ID, which is generated only for COMMIT events. |
serverId | The server ID of the source database, which can be viewed by running SHOW VARIABLES LIKE 'server_id' . |
threadId | The ID of the session that committed the current transaction, which can be viewed by running SHOW processlist; . |
sourceType | The source database type, which currently can only be MySQL. |
sourceVersion | The source database version, which can be viewed by running:
select version(); . |
schemaName | Database name |
tableName | Table name |
objectName | Format: Database name.table name |
columns | The definitions of columns in the table |
oldColumns | The data of the row before DML execution. If the message is an INSERT message, the array will be null. There are 12 element types in the array: Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, and EmptyObject. |
newColumns | The data of the row after DML execution. If the message is a DELETE message, the array will be null. There are 12 element types in the array: Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, and EmptyObject. |
sql | The DDL SQL statement |
executionTime | The DDL execution duration in seconds |
heartbeatTimestamp | The timestamp of the heartbeat message in seconds. This field is present only for heartbeat messages. |
syncedGtid | The collection of GTIDs parsed by DTS in the format of c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13 . |
fakeGtid | Whether the current GTID is forged. If gtid_mode is not enabled, DTS will forge a GTID. |
pkNames | If the table in the source database has a primary key, this parameter will be carried in the DML message; otherwise, it will not be carried. |
readerTimestamp | The time when DTS processed the current data record, which is a UNIX timestamp in milliseconds. |
tags | |
total | The total number of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension. |
index | The index of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension. |
Field
, including the following four attributes:Type in MySQL | Corresponding Type in Avro |
MYSQL_TYPE_NULL | EmptyObject |
MYSQL_TYPE_INT8 | Integer |
MYSQL_TYPE_INT16 | Integer |
MYSQL_TYPE_INT24 | Integer |
MYSQL_TYPE_INT32 | Integer |
MYSQL_TYPE_INT64 | Integer |
MYSQL_TYPE_BIT | Integer |
MYSQL_TYPE_YEAR | DateTime |
MYSQL_TYPE_FLOAT | Float |
MYSQL_TYPE_DOUBLE | Float |
MYSQL_TYPE_VARCHAR | Character |
MYSQL_TYPE_STRING | Character. If the original type is binary, this type will correspond to BinaryObject. |
MYSQL_TYPE_VAR_STRING | Character. If the original type is varbinary, this type will correspond to BinaryObject. |
MYSQL_TYPE_TIMESTAMP | Timestamp |
MYSQL_TYPE_DATE | DateTime |
MYSQL_TYPE_TIME | DateTime |
MYSQL_TYPE_DATETIME | DateTime |
MYSQL_TYPE_TIMESTAMP_NEW | Timestamp |
MYSQL_TYPE_DATE_NEW | DateTime |
MYSQL_TYPE_TIME_NEW | DateTime |
MYSQL_TYPE_DATETIME_NEW | DateTime |
MYSQL_TYPE_ENUM | TextObject |
MYSQL_TYPE_SET | TextObject |
MYSQL_TYPE_DECIMAL | Decimal |
MYSQL_TYPE_DECIMAL_NEW | Decimal |
MYSQL_TYPE_JSON | TextObject |
MYSQL_TYPE_BLOB | BinaryObject |
MYSQL_TYPE_TINY_BLOB | BinaryObject |
MYSQL_TYPE_MEDIUM_BLOB | BinaryObject |
MYSQL_TYPE_LONG_BLOB | BinaryObject |
MYSQL_TYPE_GEOMETRY | BinaryObject |
Was this page helpful?