tencent cloud

Feedback

ProtoBuf Demo Description

Last updated: 2024-07-08 16:52:02

    Key Logic Description

    Message production logic

    This section describes the message production logic to help you better understand the consumption logic. The demo for each programming language uses Protobuf for serialization and contains a Protobuf definition file. In the file, three key structures are defined as follows: Envelope is the final Kafka message structure; Entry is the structure of a single subscription event; Entries is the collection of Entry. Their relationship is shown below:
    
    The production process is as follows:
    1. Pull binlog messages and encode each binlog event into an Entry.
    message Entry { // An `Entry` is the structure of an individual subscription event. An event is similar to a binlog event in MySQL.
    Header header = 1; // The event header
    Event event = 2; // The event body
    }
    
    
    message Header {
    int32 version = 1; // The protocol version of the `Entry`
    SourceType sourceType = 2; // The source database type, such as MySQL and Oracle
    MessageType messageType = 3; // The message type, i.e., event type, such as BEGIN, COMMIT, and DML
    uint32 timestamp = 4; // The event timestamp in the source binlog
    int64 serverId = 5; // The `serverId` of the source database
    string fileName = 6; // The filename of the source binlog
    uint64 position = 7; // The event offset in the source binlog file
    string gtid = 8; // The GTID of the current transaction
    string schemaName = 9; // The modified schema
    string tableName = 10; // The modified table
    uint64 seqId = 11; // The globally incremental serial number
    uint64 eventIndex = 12; // If a large event is sharded, the shard number starts from 0. This parameter is meaningless on the current version and is reserved for future use.
    bool isLast = 13; // Whether the current shard is the last shard of a sharded event; if so, the value is `true`. This parameter is meaningless on the current version and is reserved for future use.
    repeated KVPair properties = 15;
    }
    
    
    message Event {
    BeginEvent beginEvent = 1; // The BIGIN event in the binlog
    DMLEvent dmlEvent = 2; // The DML event in the binlog
    CommitEvent commitEvent = 3; // The COMMIT event in the binlog
    DDLEvent ddlEvent = 4; // The DDL event in the binlog
    RollbackEvent rollbackEvent = 5; // The rollback event. This parameter is meaningless on the current version.
    HeartbeatEvent heartbeatEvent = 6; // The heartbeat event regularly sent by the source database
    CheckpointEvent checkpointEvent = 7; // The checkpoint event added to the subscription backend, which is generated automatically once every 10 seconds and is used for Kafka production and consumption offset management.
    repeated KVPair properties = 15;
    }
    2. Multiple Entry structures are merged to reduce the number of messages, and the structure of binlog events becomes Entries after the merge. The Entries.items field refers to the Entry sequence list. The reasonable number of merged Entry structures should be smaller than that of a single Kafka message. If a single binlog event has exceeded the size limit, Entry structures will not be merged anymore, so there will be only one Entry in the Entries structure.
    message Entries {
    repeated Entry items = 1; // `Entry` list
    }
    3. Encode Entries with Protobuf to generate a binary sequence.
    4. Put the binary sequence in the data field of an Envelope. If a single binlog event is oversize, the binary sequence may exceed the size limit of a single Kafka message. In this case, you can separate the binary sequence into multiple segments and put each segment in an Envelope. Envelope.total and Envelope.index record the total number of Envelope structures and the serial number of the current Envelope structure (starting from 0) respectively.
    message Envelope {
    int32 version = 1; // The protocol version, which determines how the data content is decoded.
    uint32 total = 2;
    uint32 index = 3;
    bytes data = 4; // Here, `version` is 1, indicating that the data is `Entries` serialized in the Protobuf format.
    repeated KVPair properties = 15;
    }
    5. Encode one or multiple Envelope structures generated in the previous step in sequence and deliver the Envelope structures to Kafka partitions. Multiple Envelope structures in the same Entries are delivered to the same partition in sequence.

    Message consumption logic

    This section describes the message consumption logic. The description here applies to our demos for Java, Go, and Python.
    1. Create a Kafka consumer.
    2. Start the consumption.
    3. Consume original messages in sequence and find the corresponding partitionMsgConsumer object of a message partition to process these messages.
    4. The partitionMsgConsumer object deserializes messages into the Envelope structure.
    // Convert the value of the Kafka message to an `Envelope`.
    envelope := subscribe.Envelope{}
    err := proto.Unmarshal(msg.Value, &envelope)
    5. The partitionMsgConsumer object continuously consumes one or multiple messages based on the index and total recorded in the Envelope until Envelope.index equals to Envelope.total-1 (which indicates that a complete Entries is received. See the consumption and production logic mentioned above).
    6. Combine the data fields of multiple consecutive Envelope structures received together in sequence. Decode the combined binary sequences into Entries with Protobuf.
    if envelope.Index == 0 {
    pmc.completeMsg = envelope
    } else {
    // Splice the split binary sequences of `Entries`
    pmc.completeMsg.Data = append(pmc.completeMsg.Data, envelope.Data...)
    }
    if envelope.Index < envelope.Total-1 {
    return nil
    }
    // Deserialize `Envelope.Data` to `Entries`
    entries := subscribe.Entries{}
    err = proto.Unmarshal(pmc.completeMsg.Data, &entries)
    
    7. Process Entries.items in sequence, and print the original Entry structure or convert it into a SQL statement.
    8. When a checkpoint message is consumed, submit the Kafka message offset. A checkpoint message is a special message written to Kafka by the subscription backend once every 10 seconds.

    Database Field Mapping and Storage

    This section describes the mappings between database field types and data types defined in the Protobuf protocol. A field value in the source database such as MySQL is structured as follows in the Protobuf protocol.
    message Data {
    DataType dataType = 1;
    string charset = 2; // The encoding (string) type of DataType_STRING, with the value stored in `bv`
    string sv = 3; // The string value of DataType_INT8/16/32/64/UINT8/16/32/64/Float32/64/DataType_DECIMAL
    bytes bv = 4; // The value of DataType_STRING/DataType_BYTES
    }
    The field DataType refers to the type of stored fields. The values are as enumerated below:
    enum DataType {
    NIL = 0; // The value is `NULL`
    INT8 = 1;
    INT16 = 2;
    INT32 = 3;
    INT64 = 4;
    UINT8 = 5;
    UINT16 = 6;
    UINT32 = 7;
    UINT64 = 8;
    FLOAT32 = 9;
    FLOAT64 = 10;
    BYTES = 11;
    DECIMAL = 12;
    STRING = 13;
    NA = 14; // The value does not exist (N/A).
    }
    The bv field stores the binary representation of STRING and BYTES; the sv field stores the string representation of INT8/16/32/64/UINT8/16/32/64/DECIMAL; the charset field stores the encoding type of STRING.
    Mapping between MySQL/TDSQL original type and DataType is as shown below (the MYSQL_TYPE_INT8/16/24/32/64 modified by UNSIGNED is respectively mapped to UINT8/16/32/32/64):
    Note
    DATE, TIME, and DATETIME types don't support time zone.
    The TIMESTAMP type supports time zone. Fields of this type will have their current time zone converted to Universal Time Coordinated (UTC) for storage, and vice versa for query.
    The MYSQL_TYPE_TIMESTAMP and MYSQL_TYPE_TIMESTAMP_NEW fields carry the time zone information, which you can convert on your own when consuming data. For example, the format of the time data output by DTS is a string with time zone, such as 2021-05-17 07:22:42 +00:00, where +00:00 indicates the UTC time. You need to take into account the time zone information when parsing and converting the data.
    MySQL/TDSQL Field Type
    Protobuf DataType Value
    MYSQL_TYPE_NULL
    NIL
    MYSQL_TYPE_INT8
    INT8
    MYSQL_TYPE_INT16
    INT16
    MYSQL_TYPE_INT24
    INT32
    MYSQL_TYPE_INT32
    INT32
    MYSQL_TYPE_INT64
    INT64
    MYSQL_TYPE_BIT
    INT64
    MYSQL_TYPE_YEAR
    INT64
    MYSQL_TYPE_FLOAT
    FLOAT32
    MYSQL_TYPE_DOUBLE
    FLOAT64
    MYSQL_TYPE_VARCHAR
    STRING
    MYSQL_TYPE_STRING
    STRING
    MYSQL_TYPE_VAR_STRING
    STRING
    MYSQL_TYPE_TIMESTAMP
    STRING
    MYSQL_TYPE_DATE
    STRING
    MYSQL_TYPE_TIME
    STRING
    MYSQL_TYPE_DATETIME
    STRING
    MYSQL_TYPE_TIMESTAMP_NEW
    STRING
    MYSQL_TYPE_DATE_NEW
    STRING
    MYSQL_TYPE_TIME_NEW
    STRING
    MYSQL_TYPE_DATETIME_NEW
    STRING
    MYSQL_TYPE_ENUM
    STRING
    MYSQL_TYPE_SET
    STRING
    MYSQL_TYPE_DECIMAL
    DECIMAL
    MYSQL_TYPE_DECIMAL_NEW
    DECIMAL
    MYSQL_TYPE_JSON
    BYTES
    MYSQL_TYPE_BLOB
    BYTES
    MYSQL_TYPE_TINY_BLOB
    BYTES
    MYSQL_TYPE_MEDIUM_BLOB
    BYTES
    MYSQL_TYPE_LONG_BLOB
    BYTES
    MYSQL_TYPE_GEOMETRY
    BYTES
    
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support