tencent cloud

All product documents
Tencent Cloud WeData
Kafka Data Source
Last updated: 2024-11-01 17:00:28
Kafka Data Source
Last updated: 2024-11-01 17:00:28
Data Integration provides Kafka read and write capabilities. This article introduces the pre-environment configuration for real-time data synchronization using Kafka and the current capability support status.

Supported Versions

Currently, DataInLong supports Kafka real-time read and write for both single tables and entire databases. To use real-time synchronization, the following version limitations must be adhered to:
Node
Version
Kafka
2.4.1,2.7.1,2.8.1,2.8.2

Use Limits

The Kafka entire database source must meet the canal-json, debezium, ogg-json serialization formats.
A single database task supports multiple Topics. Switch to manual mode and separate with commas.
The canal-json/debezium format requires the inclusion of mysqltype, sqltype, and primarykey fields.
When Kafka is the source, only adding columns, deleting columns, and adding table events are synchronized to the target end.

Kafka Environment Preparation

The process for establishing a connection between the Kafka client and server is as follows:
1. The client connects to the Kafka server using the bootstrap.servers address you specified. The Kafka server returns the metadata of each broker in the cluster, including the connection addresses of each broker, based on the configuration.
2. The client connects to each broker using the connection addresses returned by the brokers in the first step for reading or writing:
We need to be aware that if the bootstrap.servers address is accessible but network connectivity issues are still reported, you can troubleshoot using the following methods.
Check whether the broker connection addresses returned by the Kafka server have any connectivity issues.
Check if the addresses in the listeners and advertised.listeners fields in the Kafka broker configuration file server.properties can connect with the network of the integrated resource group.
Kafka at the target end supports auto Topic creation and supports the specified Partition strategy. If you need to use the automatic Topic creation feature, please set it up in advance on the Kafka server:
auto.create.topics.enable=true
After enabling the auto Topic creation feature, the target Topic must comply with CKafka/Kafka Topic naming rules to prevent Topic creation failure during task execution.
When enabling automatic Topic creation in Kafka, please configure the number of partitions properly to avoid performance issues.

Whole Database Reading Configuration

Data source settings




Parameter
Description
Data Source
Select the Kafka data source that needs to be synchronized.
Source Topic
Select or enter the name of the Topic to be consumed by the task schedule.
Serialization Format
Set the original message format in Kafka. Currently, canal-json, debezium formats are supported.
Note:
The set format must be consistent with the actual message format.
Read Position
Set the Kafka data reading position:
Start from the earliest point: earliest.
Start from the newest point: latest.
Start consuming from the specified time point: set the specific task start time point.
Advanced Settings (optional)
You can configure parameters according to business needs.

Whole Database Writing Configuration

Data Target Settings




Parameter
Description
Data Destination
Select the target data source that needs to be synchronized
Serialization Format
Supports both canal-json and debezium formats
Merge Update Messages
Switch Off: One record update on the source end corresponds to two Kafka records, representing the data before and after the update.
Switch On: One record update on the source end corresponds to one Kafka record, containing both the data before and after the update.
Sync to Multiple Topics
Default On: This option allows many-to-many mapping between the source data and target Topics. During task execution, Topics will be matched based on policies. If the Topic does not exist, the system will automatically create it according to Topic name matching rules.
Off: Manually enter or select the target Topic name. All subsequent data will be written into this Topic.
Topic Matching Policy
Write with the Same Name as Source Table: Use the Topic with the same name as the source table by default.
Custom: Match Topics based on custom policy rules.
Partition Mapping
Configure topic partition mapping (Round-robin partition write, Table Name Partitioning, Primary Key Partitioning, Specify partition, Custom):
Round-robin Partition Write: Upstream data is written to each partition in a round-robin manner.
Table Name Partitioning: Write to each partition based on the hash mapping of the table name in the upstream data.
Primary Key Partitioning: Write to each partition based on the hash mapping of primary key data content in the upstream data.
Specify partition:
Write to a single specified partition: Enter the partition number. All messages will only be written to the fixed partition.
Write to multiple partitions based on table rules: Supports object matching by inputting database and table regex. Objects that match the rules are written to the specified partition. Rules are executed sequentially, and matched tables are excluded from subsequent rule matching.
Custom Definition: Supports using "Built-in Parameters" to concatenate write partition rules. After setting, messages will be hash partitioned based on the corresponding values of the partition rules.
Advanced Settings (optional)
You can configure parameters according to business needs.

Appendix: Canal-json/Debezium Data Format Example

Canal-json
{
"data": [
{
"id": "2",
"name": "scooter33",
"description": "Big 2-wheel scooter233",
"weight": "5.11"
}
],
"database": "pacino99",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.12"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products999",
"ts": 1589373560798,
"type": "UPDATE"
}
Debezium
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory2.customers2.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "mysql-server-1.inventory2.customers2.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "mysql-server-1.inventory.customers.Envelope"
},
"payload": {
"op": "c",
"ts_ms": 1465491411815,
"before": null,
"after": {
"id": 12003,
"first_name": "Anne322",
"last_name": "Kretchmar3222",
"email": "annek@noanswer.org3222"
},
"source": {
"version": "1.9.6.Final",
"connector": "mysql",
"name": "mysql-server-1",
"ts_ms": 0,
"snapshot": false,
"db": "inventory333",
"table": "customers433",
"server_id": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 154,
"row": 0,
"thread": 7,
"query": ""
}
}
}

Single Table Read Node Configuration

1. In the DataInLong page, click Real-time synchronization on the left directory bar.
2. In the real-time synchronization page, select Single Table Synchronization at the top to create a new one (you can choose either form or canvas mode) and enter the configuration page.



Parameter
Description(Optional)
Data Source
Kafka Reader Data Source Type supports Kafka.
topic
Topic in Kafka Data Source.
Serialization Format
Kafka Message Serialization Format Type supports: canal-json, ogg-json, json, avro, csv, raw.
Message Type
Append Message: Messages in Kafka originate from the Append message stream, typically without a unique key. It's recommended to use the Append Write Mode for the write node.
Upsert Message: Messages in Kafka originate from the Upsert message stream, typically containing a unique key. Setting this ensures Exactly-Once semantics. It's recommended to use the Upsert Write Mode for the write node.
Unique Key
In Upsert Write Mode, you need to set a unique key to ensure data order
Read Position
Starting point for data synchronization when launching the sync task. Supports earliest and latest
Consumer Group ID
Please avoid duplicating this parameter with other consumption processes to ensure the correctness of consumption location. If this parameter is not specified, the default setting is group.id=WeData_group_${taskId}.
Advanced Settings (optional)
You can configure parameters according to business needs.

Single Table Writing Node Configuration

1. In the DataInLong page, click Real-time synchronization on the left directory bar.
2. In the real-time synchronization page, select Single Table Synchronization at the top to create a new one (you can choose either form or canvas mode) and enter the configuration page.



Parameter
Description(Optional)
Data Destination
Kafka Writer Data Source Type supports Kafka.
topic
Topic in Kafka Data Source.
Serialization Format
Kafka Message Serialization Format Type supports: canal-json, json, avro.
Message Type
upsert Message: Update write. When there is no primary key conflict, a new row can be inserted; when there is a primary key conflict, an update is performed. Suitable for scenarios where the target table has a primary key and needs to be updated in real-time based on the source data. There will be some performance overhead.
Append Messages: Append write. Regardless of whether there is a primary key, data is appended by inserting new rows. Whether there is a primary key conflict depends on the target end. Suitable for scenarios where there is no primary key and data duplication is allowed. No performance loss.
Unique Key
In Upsert Write Mode, you need to set a unique key to ensure data order
Read Position
Configure topic partition mapping (Round-robin partition write, Hash write to partitions based on specified field content, Specify partition):
Round-robin Partition Write: Upstream data is written to each partition in a round-robin manner.
Hash based on specified field content:
Write Partition: Hash mapping of the specified field content writes to each partition.
Specify Partition: Enter the partition number. All messages will only be written to the fixed partition.
Advanced Settings (optional)
You can configure parameters according to business needs.

Log collection write node configuration

Parameter
Description
Data Destination
Select the available Kafka data source in the current project. Kafka write source type supports Kafka, Ckafka.
topic
Topic in Kafka Data Source.
Serialization Format
Kafka message serialization format types, supporting three types:
canal-json
json
avro
Message Type
Kafka supports two writing modes:
Append: Append write.
Upsert: Insert messages in upsert mode. Once set, messages can only be processed by the message end once to ensure Exactly-Once.
Unique Key
In Upsert write mode, you need to set the unique key to ensure data ordering. Multiple selections are supported. In Append mode, setting a unique key is not required.
Advanced Settings (optional)
You can configure parameters according to business needs.

Read and Write Data Type Conversion support

Internal Types
Kafka Type
SMALLINT
SMALLINT,
TINYINT UNSIGNED, TINYINT UNSIGNED ZEROFILL

INTEGER
INT, INTEGER, YEAR, SHORT, MEDIUMINT, SMALLINT UNSIGNED, SMALLINT UNSIGNED ZEROFILL
BIGINT
BIGINT, INT UNSIGNED, MEDIUMINT UNSIGNED, MEDIUMINT UNSIGNED ZEROFILL, INT UNSIGNED ZEROFILL
DECIMAL
BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL, NUMERIC, NUMERIC UNSIGNED, NUMERIC UNSIGNED ZEROFILL,
DECIMAL, DYNAMIC DECIMAL, DECIMAL UNSIGNED, DECIMAL UNSIGNED ZEROFILL, FIXED, FIXED UNSIGNED, FIXED UNSIGNED ZEROFILL
FLOAT
FLOAT, FLOAT UNSIGNED, FLOAT UNSIGNED ZEROFILL
DOUBLE
DOUBLE, DOUBLE UNSIGNED, DOUBLE UNSIGNED ZEROFILL, DOUBLE PRECISION, DOUBLE PRECISION UNSIGNED, ZEROFILL, REAL, REAL UNSIGNED, REAL UNSIGNED ZEROFILL
TIMESTAMP
ATETIME, TIMESTAMP WITH LOCAL TIME ZONE, TIMESTAMP WITH TIME ZONE
TIMESTAMP_WITH_TIMEZONE
TIMESTAMP, TIMESTAMP WITH LOCAL TIME ZONE, TIMESTAMP WITH TIME ZONE
BLOB
BLOB, TINYBLOB, MEDIUMBLOB, LONGBLOB
VARCHAR
JSON, VARCHAR, TEXT, TINYTEXT, MEDIUMTEXT, LONGTEXT
Was this page helpful?
You can also Contact Sales or Submit a Ticket for help.
Yes
No

Feedback

Contact Us

Contact our sales team or business advisors to help your business.

Technical Support

Open a ticket if you're looking for further assistance. Our Ticket is 7x24 available.

7x24 Phone Support
Hong Kong, China
+852 800 906 020 (Toll Free)
United States
+1 844 606 0804 (Toll Free)
United Kingdom
+44 808 196 4551 (Toll Free)
Canada
+1 888 605 7930 (Toll Free)
Australia
+61 1300 986 386 (Toll Free)
EdgeOne hotline
+852 300 80699
More local hotlines coming soon