message Entry { //Entry 是单个订阅事件结构,一个事件相当于 MySQL 的一个 binlog eventHeader header = 1; //事件头Event event = 2; //事件体}message Header {int32 version = 1; //Entry 协议版本SourceType sourceType = 2; //源库的类型信息,包括 MySQL,Oracle 等类型MessageType messageType = 3; //消息的类型,也就是 Event 的类型,包括 BEGIN、COMMIT、DML 等uint32 timestamp = 4; //Event 在原始 binlog 中的时间戳int64 serverId = 5; //源的 serverIdstring fileName = 6; //源 binlog 的文件名称uint64 position = 7; //事件在源 binlog 文件中的偏移量string gtid = 8; //当前事务的 gtidstring schemaName = 9; //变更影响的 schemastring tableName = 10; //变更影响的 tableuint64 seqId = 11; //全局递增序列号uint64 eventIndex = 12; //如果大的 event 分片,每个分片从0开始编号,当前版本无意义,留待后续扩展用bool isLast = 13; //当前 event 是否 event 分片的最后一块,是则为 true,当前版本无意义,留待后续扩展用repeated KVPair properties = 15;}message Event {BeginEvent beginEvent = 1; //binlog 中的 begin 事件DMLEvent dmlEvent = 2; //binlog 中的 dml 事件CommitEvent commitEvent = 3; //binlog 中的 commit 事件DDLEvent ddlEvent = 4; //binlog 中的 ddl 事件RollbackEvent rollbackEvent = 5; //rollback 事件,当前版本无意义HeartbeatEvent heartbeatEvent = 6; //源库定时发送的心跳事件CheckpointEvent checkpointEvent = 7; //订阅后台添加的 checkpoint 事件,每10秒自动生成一个,用于 Kafka 生产和消费位点管理repeated KVPair properties = 15;}
message Entries {repeated Entry items = 1; //entry list}
message Envelope {int32 version = 1; //protocol version, 决定了 data 内容如何解码uint32 total = 2;uint32 index = 3;bytes data = 4; //当前 version 为1, 表示 data 中数据为 Entries 被 PB 序列化之后的结果repeated KVPair properties = 15;}
// 创建一个 Flink-kafka 消费者FlinkKafkaConsumer<RecordMsgObject> consumer =new FlinkKafkaConsumer<>(topic, new DeserializeProtobufToRecordMsgObject(), props);
// 自定义反序列化器,将原始消息反序列化为 RecordMsgObject 对象@Overridepublic RecordMsgObject deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {RecordMsgObject obj = new RecordMsgObject();obj.topic = record.topic();obj.partition = record.partition();obj.offset = record.offset();obj.partitionSeq = getPartitionSeq(record);obj.key = new String(record.key());obj.headers = record.headers();// 这里收到的就是 Envelope 的二进制的值obj.value = record.value();return obj;}
//将收到的消息按照 parttition 分区处理stream.keyBy(RecordMsgObject::getPartition).process(new SubscribeMsgProcess(trans2sql)).setParallelism(1);
// 反序列出 envlope,本 demo 默认一个 envlope 就可以存储完整个 bnlog-event 的数据SubscribeProtobufData.Envelope envelope = SubscribeProtobufData.Envelope.parseFrom(record.value);if (1 != envelope.getVersion()) {throw new IllegalStateException(String.format("unsupported version: %d", envelope.getVersion()));}
// 反序列得到 EntriesByteString envelopeData = envelope.getData();SubscribeProtobufData.Entries entries;if (1 == envelope.getTotal()) {entries = SubscribeProtobufData.Entries.parseFrom(envelopeData.toByteArray());} else {entries = SubscribeProtobufData.Entries.parseFrom(shardMsgMap.get(shardId).toByteArray());shardMsgMap.remove(shardId);}
// 遍历每个 Entry,根据 Entry 类型去打印 sqlfor (SubscribeProtobufData.Entry entry : entries.getItemsList()) {onEntry(record.partition, record.offset, ps, entry, trans2sql);}
message Data {DataType dataType = 1;string charset = 2; //DataType_STRING 的编码类型, 值存储在 bv 里面string sv = 3; //DataType_INT8/16/32/64/UINT8/16/32/64/Float32/64/DataType_DECIMAL 的字符串值bytes bv = 4; //DataType_STRING/DataType_BYTES 的值}
enum DataType {NIL = 0; //值为 NULLINT8 = 1;INT16 = 2;INT32 = 3;INT64 = 4;UINT8 = 5;UINT16 = 6;UINT32 = 7;UINT64 = 8;FLOAT32 = 9;FLOAT64 = 10;BYTES = 11;DECIMAL = 12;STRING = 13;NA = 14; //值不存在 (N/A)}
DATE
,TIME
,DATETIME
类型不支持时区。TIMESTAMP
类型支持时区,该类型字段表示:存储时,系统会从当前时区转换为 UTC(Universal Time Coordinated)进行存储;查询时,系统会从 UTC 转换为当前时区进行查询。TDSQL MySQL 字段类型 | 对应的 Protobuf DataType 枚举值 |
MYSQL_TYPE_NULL | NIL |
MYSQL_TYPE_INT8 | INT8 |
MYSQL_TYPE_INT16 | INT16 |
MYSQL_TYPE_INT24 | INT32 |
MYSQL_TYPE_INT32 | INT32 |
MYSQL_TYPE_INT64 | INT64 |
MYSQL_TYPE_BIT | INT64 |
MYSQL_TYPE_YEAR | INT64 |
MYSQL_TYPE_FLOAT | FLOAT32 |
MYSQL_TYPE_DOUBLE | FLOAT64 |
MYSQL_TYPE_VARCHAR | STRING |
MYSQL_TYPE_STRING | STRING |
MYSQL_TYPE_VAR_STRING | STRING |
MYSQL_TYPE_TIMESTAMP | STRING |
MYSQL_TYPE_DATE | STRING |
MYSQL_TYPE_TIME | STRING |
MYSQL_TYPE_DATETIME | STRING |
MYSQL_TYPE_TIMESTAMP_NEW | STRING |
MYSQL_TYPE_DATE_NEW | STRING |
MYSQL_TYPE_TIME_NEW | STRNG |
MYSQL_TYPE_DATETIME_NEW | STRING |
MYSQL_TYPE_ENUM | STRING |
MYSQL_TYPE_SET | STRING |
MYSQL_TYPE_DECIMAL | DECIMAL |
MYSQL_TYPE_DECIMAL_NEW | DECIMAL |
MYSQL_TYPE_JSON | BYTES |
MYSQL_TYPE_BLOB | BYTES |
MYSQL_TYPE_TINY_BLOB | BYTES |
MYSQL_TYPE_MEDIUM_BLOB | BYTES |
MYSQL_TYPE_LONG_BLOB | BYTES |
MYSQL_TYPE_GEOMETRY | BYTES |
本页内容是否解决了您的问题?