Node | Version |
Kafka | 2.4.1,2.7.1,2.8.1,2.8.2 |
auto.create.topics.enable=true
Parameter | Description |
Data Source | Select the Kafka data source that needs to be synchronized. |
Source Topic | Select or enter the name of the Topic to be consumed by the task schedule. |
Serialization Format | Set the original message format in Kafka. Currently, canal-json, debezium formats are supported. Note: The set format must be consistent with the actual message format. |
Read Position | Set the Kafka data reading position: Start from the earliest point: earliest. Start from the newest point: latest. Start consuming from the specified time point: set the specific task start time point. |
Advanced Settings (optional) | You can configure parameters according to business needs. |
Parameter | Description |
Data Destination | Select the target data source that needs to be synchronized |
Serialization Format | Supports both canal-json and debezium formats |
Merge Update Messages | Switch Off: One record update on the source end corresponds to two Kafka records, representing the data before and after the update. Switch On: One record update on the source end corresponds to one Kafka record, containing both the data before and after the update. |
Sync to Multiple Topics | Default On: This option allows many-to-many mapping between the source data and target Topics. During task execution, Topics will be matched based on policies. If the Topic does not exist, the system will automatically create it according to Topic name matching rules. Off: Manually enter or select the target Topic name. All subsequent data will be written into this Topic. |
Topic Matching Policy | Write with the Same Name as Source Table: Use the Topic with the same name as the source table by default. Custom: Match Topics based on custom policy rules. |
Partition Mapping | Configure topic partition mapping (Round-robin partition write, Table Name Partitioning, Primary Key Partitioning, Specify partition, Custom): Round-robin Partition Write: Upstream data is written to each partition in a round-robin manner. Table Name Partitioning: Write to each partition based on the hash mapping of the table name in the upstream data. Primary Key Partitioning: Write to each partition based on the hash mapping of primary key data content in the upstream data. Specify partition: Write to a single specified partition: Enter the partition number. All messages will only be written to the fixed partition. Write to multiple partitions based on table rules: Supports object matching by inputting database and table regex. Objects that match the rules are written to the specified partition. Rules are executed sequentially, and matched tables are excluded from subsequent rule matching. Custom Definition: Supports using "Built-in Parameters" to concatenate write partition rules. After setting, messages will be hash partitioned based on the corresponding values of the partition rules. |
Advanced Settings (optional) | You can configure parameters according to business needs. |
{"data": [{"id": "2","name": "scooter33","description": "Big 2-wheel scooter233","weight": "5.11"}],"database": "pacino99","es": 1589373560000,"id": 9,"isDdl": false,"mysqlType": {"id": "INTEGER","name": "VARCHAR(255)","description": "VARCHAR(512)","weight": "FLOAT"},"old": [{"weight": "5.12"}],"pkNames": ["id"],"sql": "","sqlType": {"id": 4,"name": 12,"description": 12,"weight": 7},"table": "products999","ts": 1589373560798,"type": "UPDATE"}
{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory2.customers2.Value","field": "before"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory2.customers2.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "boolean","optional": true,"default": false,"field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "table"},{"type": "int64","optional": false,"field": "server_id"},{"type": "string","optional": true,"field": "gtid"},{"type": "string","optional": false,"field": "file"},{"type": "int64","optional": false,"field": "pos"},{"type": "int32","optional": false,"field": "row"},{"type": "int64","optional": true,"field": "thread"},{"type": "string","optional": true,"field": "query"}],"optional": false,"name": "io.debezium.connector.mysql.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"}],"optional": false,"name": "mysql-server-1.inventory.customers.Envelope"},"payload": {"op": "c","ts_ms": 1465491411815,"before": null,"after": {"id": 12003,"first_name": "Anne322","last_name": "Kretchmar3222","email": "annek@noanswer.org3222"},"source": {"version": "1.9.6.Final","connector": "mysql","name": "mysql-server-1","ts_ms": 0,"snapshot": false,"db": "inventory333","table": "customers433","server_id": 0,"gtid": null,"file": "mysql-bin.000003","pos": 154,"row": 0,"thread": 7,"query": ""}}}
Parameter | Description(Optional) |
Data Source | Kafka Reader Data Source Type supports Kafka. |
topic | Topic in Kafka Data Source. |
Serialization Format | Kafka Message Serialization Format Type supports: canal-json, ogg-json, json, avro, csv, raw. |
Message Type | Append Message: Messages in Kafka originate from the Append message stream, typically without a unique key. It's recommended to use the Append Write Mode for the write node. Upsert Message: Messages in Kafka originate from the Upsert message stream, typically containing a unique key. Setting this ensures Exactly-Once semantics. It's recommended to use the Upsert Write Mode for the write node. |
Unique Key | In Upsert Write Mode, you need to set a unique key to ensure data order |
Read Position | Starting point for data synchronization when launching the sync task. Supports earliest and latest |
Consumer Group ID | Please avoid duplicating this parameter with other consumption processes to ensure the correctness of consumption location. If this parameter is not specified, the default setting is group.id=WeData_group_${taskId}. |
Advanced Settings (optional) | You can configure parameters according to business needs. |
Parameter | Description(Optional) |
Data Destination | Kafka Writer Data Source Type supports Kafka. |
topic | Topic in Kafka Data Source. |
Serialization Format | Kafka Message Serialization Format Type supports: canal-json, json, avro. |
Message Type | upsert Message: Update write. When there is no primary key conflict, a new row can be inserted; when there is a primary key conflict, an update is performed. Suitable for scenarios where the target table has a primary key and needs to be updated in real-time based on the source data. There will be some performance overhead. Append Messages: Append write. Regardless of whether there is a primary key, data is appended by inserting new rows. Whether there is a primary key conflict depends on the target end. Suitable for scenarios where there is no primary key and data duplication is allowed. No performance loss. |
Unique Key | In Upsert Write Mode, you need to set a unique key to ensure data order |
Read Position | Configure topic partition mapping (Round-robin partition write, Hash write to partitions based on specified field content, Specify partition): Round-robin Partition Write: Upstream data is written to each partition in a round-robin manner. Hash based on specified field content: Write Partition: Hash mapping of the specified field content writes to each partition. Specify Partition: Enter the partition number. All messages will only be written to the fixed partition. |
Advanced Settings (optional) | You can configure parameters according to business needs. |
Parameter | Description |
Data Destination | Select the available Kafka data source in the current project. Kafka write source type supports Kafka, Ckafka. |
topic | Topic in Kafka Data Source. |
Serialization Format | Kafka message serialization format types, supporting three types: canal-json json avro |
Message Type | Kafka supports two writing modes: Append: Append write. Upsert: Insert messages in upsert mode. Once set, messages can only be processed by the message end once to ensure Exactly-Once. |
Unique Key | In Upsert write mode, you need to set the unique key to ensure data ordering. Multiple selections are supported. In Append mode, setting a unique key is not required. |
Advanced Settings (optional) | You can configure parameters according to business needs. |
Internal Types | Kafka Type |
SMALLINT | SMALLINT, TINYINT UNSIGNED, TINYINT UNSIGNED ZEROFILL |
INTEGER | INT, INTEGER, YEAR, SHORT, MEDIUMINT, SMALLINT UNSIGNED, SMALLINT UNSIGNED ZEROFILL |
BIGINT | BIGINT, INT UNSIGNED, MEDIUMINT UNSIGNED, MEDIUMINT UNSIGNED ZEROFILL, INT UNSIGNED ZEROFILL |
DECIMAL | BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL, NUMERIC, NUMERIC UNSIGNED, NUMERIC UNSIGNED ZEROFILL, DECIMAL, DYNAMIC DECIMAL, DECIMAL UNSIGNED, DECIMAL UNSIGNED ZEROFILL, FIXED, FIXED UNSIGNED, FIXED UNSIGNED ZEROFILL |
FLOAT | FLOAT, FLOAT UNSIGNED, FLOAT UNSIGNED ZEROFILL |
DOUBLE | DOUBLE, DOUBLE UNSIGNED, DOUBLE UNSIGNED ZEROFILL, DOUBLE PRECISION, DOUBLE PRECISION UNSIGNED, ZEROFILL, REAL, REAL UNSIGNED, REAL UNSIGNED ZEROFILL |
TIMESTAMP | ATETIME, TIMESTAMP WITH LOCAL TIME ZONE, TIMESTAMP WITH TIME ZONE |
TIMESTAMP_WITH_TIMEZONE | TIMESTAMP, TIMESTAMP WITH LOCAL TIME ZONE, TIMESTAMP WITH TIME ZONE |
BLOB | BLOB, TINYBLOB, MEDIUMBLOB, LONGBLOB |
VARCHAR | JSON, VARCHAR, TEXT, TINYTEXT, MEDIUMTEXT, LONGTEXT |
Was this page helpful?