Demo Language | TencentDB for MySQL and TDSQL-C MySQL |
Java |
java -jar avro-tools-1.8.2.jar compile -string schema Record.avsc
: Code generation path.pom.xml
file. The version in the following code must be the same as the Flink version you use.<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>1.13.6</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro</artifactId><version>1.13.6</version></dependency>
mvn clean package
../bin/flink run consumerDemo-avro-flink-1.0-SNAPSHOT.jar --brokers xxx --topic xxx --group xxx --user xxx --password xxx —trans2sql
broker
is the private network access address for data subscription to Kafka, and topic
is the subscription topic, which can be viewed on the Subscription details page as instructed in Viewing Subscription Details.group
, user
, and password
are the name, account, and password of the consumer group, which can be viewed on the Consumption Management page as instructed in Managing Consumer Group.trans2sql
indicates whether to enable conversion to SQL statement. In Java code, if this parameter is carried, the conversion will be enabled.consumerDemo-avro-flink\\src\\main\\resources\\avro-tools-1.8.2.jar
: The tool used to generate Avro protocol code.consumerDemo-avro-flink\\src\\main\\java\\com\\tencent\\subscribe\\avro
: The directory where the Avro tool generates code.consumerDemo-avro-flink\\src\\main\\resources\\Record.avsc
: The protocol definition file.Record.avsc
. The main data structure is record, which is used to represent a data record in binlog. The record structure is as follows. Other data structures can be viewed in Record.avsc
.{"namespace": "com.tencent.subscribe.avro", // The last schema in `Record.avsc`, with `name` displayed as `Record`."type": "record","name": "Record", // `name` is displayed as `Record`, indicating the format of the data consumed from Kafka."fields": [{"name": "id", // `id` indicates a globally incremental ID. More record values are explained as follows:"type": "long","doc": "unique id of this record in the whole stream"},{"name": "version", // `version` indicates the protocol version."type": "int","doc": "protocol version"},{"name": "messageType", // Message type"aliases": ["operation"],"type": {"namespace": "com.tencent.subscribe.avro","name": "MessageType","type": "enum","symbols": ["INSERT","UPDATE","DELETE","DDL","BEGIN","COMMIT","HEARTBEAT","CHECKPOINT","ROLLBACK"]}},{......},}
Field | Description |
id | The globally incremental ID. |
version | The protocol version, which is v1 currently. |
messageType | The message type. Enumerated values: "INSERT", "UPDATE", "DELETE", "DDL", "BEGIN", "COMMIT", "HEARTBEAT", "CHECKPOINT". |
fileName | The name of the binlog file where the current record is located. |
position | The end offset of the current record in the binlog in the format of End_log_pos@binlog file number . For example, if the current record is in file mysql-bin.000004 and the end offset is 2196, then the value of this parameter will be 2196@4 . |
safePosition | The start offset of the current transaction in the binlog, which is in the same format as described above. |
timestamp | The time when the data was written to the binlog, which is a UNIX timestamp in seconds. |
gtid | The current GTID, such as c7c98333-6006-11ed-bfc9-b8cef6e1a231:9. |
transactionId | The transaction ID, which is generated only for COMMIT events. |
serverId | The server ID of the source database, which can be viewed by running SHOW VARIABLES LIKE 'server_id' . |
threadId | The ID of the session that committed the current transaction, which can be viewed by running SHOW processlist; . |
sourceType | The source database type, which currently can only be MySQL. |
sourceVersion | The source database version, which can be viewed by running:
select version(); . |
schemaName | The database name. |
tableName | The table name. |
objectName | Format: Database name.table name. |
columns | The definitions of columns in the table. |
oldColumns | The data of the row before DML execution. If the message is an INSERT message, the array will be null. There are 12 types of elements in the array, i.e., Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, and EmptyObject. For more information, see the definitions in the demo. |
newColumns | The data of the row after DML execution. If the message is a DELETE message, the array will be null. There are 12 types of elements in the array, i.e., Integer, Character, Decimal, Float, Timestamp, DateTime, TimestampWithTimeZone, BinaryGeometry, TextGeometry, BinaryObject, TextObject, and EmptyObject. For more information, see the definitions in the demo. |
sql | The DDL SQL statement. |
executionTime | The DDL execution duration in seconds. |
heartbeatTimestamp | The timestamp of the heartbeat message in seconds, which is present only for heartbeat messages. |
syncedGtid | The collection of GTIDs parsed by DTS in the format of c7c98333-6006-11ed-bfc9-b8cef6e1a231:1-13 . |
fakeGtid | Whether the current GTID is forged. If gtid_mode is not enabled, DTS will forge a GTID. |
pkNames | If the table in the source database has a primary key, this parameter will be carried in the DML message; otherwise, it will not be carried. |
readerTimestamp | The time when DTS processed the current data record, which is a UNIX timestamp in milliseconds. |
tags | |
total | The total number of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension. |
index | The index of message segments if the message is segmented. This field is invalid on the current version (version=1) and is reserved for extension. |
Field
, including the following four attributes:Type in MySQL | Corresponding Type in Avro |
MYSQL_TYPE_NULL | EmptyObject |
MYSQL_TYPE_INT8 | Integer |
MYSQL_TYPE_INT16 | Integer |
MYSQL_TYPE_INT24 | Integer |
MYSQL_TYPE_INT32 | Integer |
MYSQL_TYPE_INT64 | Integer |
MYSQL_TYPE_BIT | Integer |
MYSQL_TYPE_YEAR | DateTime |
MYSQL_TYPE_FLOAT | Float |
MYSQL_TYPE_DOUBLE | Float |
MYSQL_TYPE_VARCHAR | Character |
MYSQL_TYPE_STRING | Character. If the original type is binary, this type will correspond to BinaryObject. |
MYSQL_TYPE_VAR_STRING | Character. If the original type is varbinary, this type will correspond to BinaryObject. |
MYSQL_TYPE_TIMESTAMP | Timestamp |
MYSQL_TYPE_DATE | DateTime |
MYSQL_TYPE_TIME | DateTime |
MYSQL_TYPE_DATETIME | DateTime |
MYSQL_TYPE_TIMESTAMP_NEW | Timestamp |
MYSQL_TYPE_DATE_NEW | DateTime |
MYSQL_TYPE_TIME_NEW | DateTime |
MYSQL_TYPE_DATETIME_NEW | DateTime |
MYSQL_TYPE_ENUM | TextObject |
MYSQL_TYPE_SET | TextObject |
MYSQL_TYPE_DECIMAL | Decimal |
MYSQL_TYPE_DECIMAL_NEW | Decimal |
MYSQL_TYPE_JSON | TextObject |
MYSQL_TYPE_BLOB | BinaryObject |
MYSQL_TYPE_TINY_BLOB | BinaryObject |
MYSQL_TYPE_MEDIUM_BLOB | BinaryObject |
MYSQL_TYPE_LONG_BLOB | BinaryObject |
MYSQL_TYPE_GEOMETRY | BinaryObject |
Was this page helpful?