tencent cloud

Feedback

Kafka Data Source

Last updated: 2024-11-01 17:00:28
    Data Integration provides Kafka read and write capabilities. This article introduces the pre-environment configuration for real-time data synchronization using Kafka and the current capability support status.

    Supported Versions

    Currently, DataInLong supports Kafka real-time read and write for both single tables and entire databases. To use real-time synchronization, the following version limitations must be adhered to:
    Node
    Version
    Kafka
    2.4.1,2.7.1,2.8.1,2.8.2

    Use Limits

    The Kafka entire database source must meet the canal-json, debezium, ogg-json serialization formats.
    A single database task supports multiple Topics. Switch to manual mode and separate with commas.
    The canal-json/debezium format requires the inclusion of mysqltype, sqltype, and primarykey fields.
    When Kafka is the source, only adding columns, deleting columns, and adding table events are synchronized to the target end.

    Kafka Environment Preparation

    The process for establishing a connection between the Kafka client and server is as follows:
    1. The client connects to the Kafka server using the bootstrap.servers address you specified. The Kafka server returns the metadata of each broker in the cluster, including the connection addresses of each broker, based on the configuration.
    2. The client connects to each broker using the connection addresses returned by the brokers in the first step for reading or writing:
    We need to be aware that if the bootstrap.servers address is accessible but network connectivity issues are still reported, you can troubleshoot using the following methods.
    Check whether the broker connection addresses returned by the Kafka server have any connectivity issues.
    Check if the addresses in the listeners and advertised.listeners fields in the Kafka broker configuration file server.properties can connect with the network of the integrated resource group.
    Kafka at the target end supports auto Topic creation and supports the specified Partition strategy. If you need to use the automatic Topic creation feature, please set it up in advance on the Kafka server:
    auto.create.topics.enable=true
    After enabling the auto Topic creation feature, the target Topic must comply with CKafka/Kafka Topic naming rules to prevent Topic creation failure during task execution.
    When enabling automatic Topic creation in Kafka, please configure the number of partitions properly to avoid performance issues.

    Whole Database Reading Configuration

    Data source settings

    
    
    
    Parameter
    Description
    Data Source
    Select the Kafka data source that needs to be synchronized.
    Source Topic
    Select or enter the name of the Topic to be consumed by the task schedule.
    Serialization Format
    Set the original message format in Kafka. Currently, canal-json, debezium formats are supported.
    Note:
    The set format must be consistent with the actual message format.
    Read Position
    Set the Kafka data reading position:
    Start from the earliest point: earliest.
    Start from the newest point: latest.
    Start consuming from the specified time point: set the specific task start time point.
    Advanced Settings (optional)
    You can configure parameters according to business needs.

    Whole Database Writing Configuration

    Data Target Settings

    
    
    
    Parameter
    Description
    Data Destination
    Select the target data source that needs to be synchronized
    Serialization Format
    Supports both canal-json and debezium formats
    Merge Update Messages
    Switch Off: One record update on the source end corresponds to two Kafka records, representing the data before and after the update.
    Switch On: One record update on the source end corresponds to one Kafka record, containing both the data before and after the update.
    Sync to Multiple Topics
    Default On: This option allows many-to-many mapping between the source data and target Topics. During task execution, Topics will be matched based on policies. If the Topic does not exist, the system will automatically create it according to Topic name matching rules.
    Off: Manually enter or select the target Topic name. All subsequent data will be written into this Topic.
    Topic Matching Policy
    Write with the Same Name as Source Table: Use the Topic with the same name as the source table by default.
    Custom: Match Topics based on custom policy rules.
    Partition Mapping
    Configure topic partition mapping (Round-robin partition write, Table Name Partitioning, Primary Key Partitioning, Specify partition, Custom):
    Round-robin Partition Write: Upstream data is written to each partition in a round-robin manner.
    Table Name Partitioning: Write to each partition based on the hash mapping of the table name in the upstream data.
    Primary Key Partitioning: Write to each partition based on the hash mapping of primary key data content in the upstream data.
    Specify partition:
    Write to a single specified partition: Enter the partition number. All messages will only be written to the fixed partition.
    Write to multiple partitions based on table rules: Supports object matching by inputting database and table regex. Objects that match the rules are written to the specified partition. Rules are executed sequentially, and matched tables are excluded from subsequent rule matching.
    Custom Definition: Supports using "Built-in Parameters" to concatenate write partition rules. After setting, messages will be hash partitioned based on the corresponding values of the partition rules.
    Advanced Settings (optional)
    You can configure parameters according to business needs.

    Appendix: Canal-json/Debezium Data Format Example

    Canal-json
    {
    "data": [
    {
    "id": "2",
    "name": "scooter33",
    "description": "Big 2-wheel scooter233",
    "weight": "5.11"
    }
    ],
    "database": "pacino99",
    "es": 1589373560000,
    "id": 9,
    "isDdl": false,
    "mysqlType": {
    "id": "INTEGER",
    "name": "VARCHAR(255)",
    "description": "VARCHAR(512)",
    "weight": "FLOAT"
    },
    "old": [
    {
    "weight": "5.12"
    }
    ],
    "pkNames": [
    "id"
    ],
    "sql": "",
    "sqlType": {
    "id": 4,
    "name": 12,
    "description": 12,
    "weight": 7
    },
    "table": "products999",
    "ts": 1589373560798,
    "type": "UPDATE"
    }
    Debezium
    {
    "schema": {
    "type": "struct",
    "fields": [
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": false,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "mysql-server-1.inventory2.customers2.Value",
    "field": "before"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "int32",
    "optional": false,
    "field": "id"
    },
    {
    "type": "string",
    "optional": false,
    "field": "first_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "last_name"
    },
    {
    "type": "string",
    "optional": false,
    "field": "email"
    }
    ],
    "optional": true,
    "name": "mysql-server-1.inventory2.customers2.Value",
    "field": "after"
    },
    {
    "type": "struct",
    "fields": [
    {
    "type": "string",
    "optional": false,
    "field": "version"
    },
    {
    "type": "string",
    "optional": false,
    "field": "connector"
    },
    {
    "type": "string",
    "optional": false,
    "field": "name"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "ts_ms"
    },
    {
    "type": "boolean",
    "optional": true,
    "default": false,
    "field": "snapshot"
    },
    {
    "type": "string",
    "optional": false,
    "field": "db"
    },
    {
    "type": "string",
    "optional": true,
    "field": "table"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "server_id"
    },
    {
    "type": "string",
    "optional": true,
    "field": "gtid"
    },
    {
    "type": "string",
    "optional": false,
    "field": "file"
    },
    {
    "type": "int64",
    "optional": false,
    "field": "pos"
    },
    {
    "type": "int32",
    "optional": false,
    "field": "row"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "thread"
    },
    {
    "type": "string",
    "optional": true,
    "field": "query"
    }
    ],
    "optional": false,
    "name": "io.debezium.connector.mysql.Source",
    "field": "source"
    },
    {
    "type": "string",
    "optional": false,
    "field": "op"
    },
    {
    "type": "int64",
    "optional": true,
    "field": "ts_ms"
    }
    ],
    "optional": false,
    "name": "mysql-server-1.inventory.customers.Envelope"
    },
    "payload": {
    "op": "c",
    "ts_ms": 1465491411815,
    "before": null,
    "after": {
    "id": 12003,
    "first_name": "Anne322",
    "last_name": "Kretchmar3222",
    "email": "annek@noanswer.org3222"
    },
    "source": {
    "version": "1.9.6.Final",
    "connector": "mysql",
    "name": "mysql-server-1",
    "ts_ms": 0,
    "snapshot": false,
    "db": "inventory333",
    "table": "customers433",
    "server_id": 0,
    "gtid": null,
    "file": "mysql-bin.000003",
    "pos": 154,
    "row": 0,
    "thread": 7,
    "query": ""
    }
    }
    }

    Single Table Read Node Configuration

    1. In the DataInLong page, click Real-time synchronization on the left directory bar.
    2. In the real-time synchronization page, select Single Table Synchronization at the top to create a new one (you can choose either form or canvas mode) and enter the configuration page.
    
    
    
    Parameter
    Description(Optional)
    Data Source
    Kafka Reader Data Source Type supports Kafka.
    topic
    Topic in Kafka Data Source.
    Serialization Format
    Kafka Message Serialization Format Type supports: canal-json, ogg-json, json, avro, csv, raw.
    Message Type
    Append Message: Messages in Kafka originate from the Append message stream, typically without a unique key. It's recommended to use the Append Write Mode for the write node.
    Upsert Message: Messages in Kafka originate from the Upsert message stream, typically containing a unique key. Setting this ensures Exactly-Once semantics. It's recommended to use the Upsert Write Mode for the write node.
    Unique Key
    In Upsert Write Mode, you need to set a unique key to ensure data order
    Read Position
    Starting point for data synchronization when launching the sync task. Supports earliest and latest
    Consumer Group ID
    Please avoid duplicating this parameter with other consumption processes to ensure the correctness of consumption location. If this parameter is not specified, the default setting is group.id=WeData_group_${taskId}.
    Advanced Settings (optional)
    You can configure parameters according to business needs.

    Single Table Writing Node Configuration

    1. In the DataInLong page, click Real-time synchronization on the left directory bar.
    2. In the real-time synchronization page, select Single Table Synchronization at the top to create a new one (you can choose either form or canvas mode) and enter the configuration page.
    
    
    
    Parameter
    Description(Optional)
    Data Destination
    Kafka Writer Data Source Type supports Kafka.
    topic
    Topic in Kafka Data Source.
    Serialization Format
    Kafka Message Serialization Format Type supports: canal-json, json, avro.
    Message Type
    upsert Message: Update write. When there is no primary key conflict, a new row can be inserted; when there is a primary key conflict, an update is performed. Suitable for scenarios where the target table has a primary key and needs to be updated in real-time based on the source data. There will be some performance overhead.
    Append Messages: Append write. Regardless of whether there is a primary key, data is appended by inserting new rows. Whether there is a primary key conflict depends on the target end. Suitable for scenarios where there is no primary key and data duplication is allowed. No performance loss.
    Unique Key
    In Upsert Write Mode, you need to set a unique key to ensure data order
    Read Position
    Configure topic partition mapping (Round-robin partition write, Hash write to partitions based on specified field content, Specify partition):
    Round-robin Partition Write: Upstream data is written to each partition in a round-robin manner.
    Hash based on specified field content:
    Write Partition: Hash mapping of the specified field content writes to each partition.
    Specify Partition: Enter the partition number. All messages will only be written to the fixed partition.
    Advanced Settings (optional)
    You can configure parameters according to business needs.

    Log collection write node configuration

    Parameter
    Description
    Data Destination
    Select the available Kafka data source in the current project. Kafka write source type supports Kafka, Ckafka.
    topic
    Topic in Kafka Data Source.
    Serialization Format
    Kafka message serialization format types, supporting three types:
    canal-json
    json
    avro
    Message Type
    Kafka supports two writing modes:
    Append: Append write.
    Upsert: Insert messages in upsert mode. Once set, messages can only be processed by the message end once to ensure Exactly-Once.
    Unique Key
    In Upsert write mode, you need to set the unique key to ensure data ordering. Multiple selections are supported. In Append mode, setting a unique key is not required.
    Advanced Settings (optional)
    You can configure parameters according to business needs.

    Read and Write Data Type Conversion support

    Internal Types
    Kafka Type
    SMALLINT
    SMALLINT,
    TINYINT UNSIGNED, TINYINT UNSIGNED ZEROFILL
    
    INTEGER
    INT, INTEGER, YEAR, SHORT, MEDIUMINT, SMALLINT UNSIGNED, SMALLINT UNSIGNED ZEROFILL
    BIGINT
    BIGINT, INT UNSIGNED, MEDIUMINT UNSIGNED, MEDIUMINT UNSIGNED ZEROFILL, INT UNSIGNED ZEROFILL
    DECIMAL
    BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL, NUMERIC, NUMERIC UNSIGNED, NUMERIC UNSIGNED ZEROFILL,
    DECIMAL, DYNAMIC DECIMAL, DECIMAL UNSIGNED, DECIMAL UNSIGNED ZEROFILL, FIXED, FIXED UNSIGNED, FIXED UNSIGNED ZEROFILL
    FLOAT
    FLOAT, FLOAT UNSIGNED, FLOAT UNSIGNED ZEROFILL
    DOUBLE
    DOUBLE, DOUBLE UNSIGNED, DOUBLE UNSIGNED ZEROFILL, DOUBLE PRECISION, DOUBLE PRECISION UNSIGNED, ZEROFILL, REAL, REAL UNSIGNED, REAL UNSIGNED ZEROFILL
    TIMESTAMP
    ATETIME, TIMESTAMP WITH LOCAL TIME ZONE, TIMESTAMP WITH TIME ZONE
    TIMESTAMP_WITH_TIMEZONE
    TIMESTAMP, TIMESTAMP WITH LOCAL TIME ZONE, TIMESTAMP WITH TIME ZONE
    BLOB
    BLOB, TINYBLOB, MEDIUMBLOB, LONGBLOB
    VARCHAR
    JSON, VARCHAR, TEXT, TINYTEXT, MEDIUMTEXT, LONGTEXT
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support