节点 | 版本 |
Kafka | 2.4.1、2.7.1、2.8.1、2.8.2 |
auto.create.topics.enable=true
参数 | 说明 |
数据来源 | 选择需要同步的 Kafka 数据源。 |
来源 Topic | 选择或输入任务计划消费的 Topic 名称。 |
序列化格式 | 设置 Kafka 内原始消息格式,目前支持解析 canal-json、debezium、ogg-json。 说明: 设置格式需与消息实际格式保持一致。 |
读取位置 | 设置 Kafka 数据读取位点: 从最早开始:earlist。 从最新开始:latest。 从指定时间开始:设定具体任务启动时间位点。 |
高级设置(可选) | 可根据业务需求配置参数。 |
参数 | 说明 |
数据源 | 选择需要同步的目标数据源 |
序列化格式 | 支持 canal-json 和 debezium 两种格式 |
Update 消息合并 | 开关关闭:源端一条记录的一次 Update 变更,对应两条 Kafka 记录,分别为变更前和变更后的数据。 开关打开:源端一条记录的一次 Update 变更,对应一条 Kafka 记录,同时包含变更前和变更后的数据。 |
同步至多 Topic | 默认打开:此选项下可实现来源数据与目标 Topic 多对多映射,任务执行过程中将根据策略匹配对应 Topic 的名称。若Topic不存在时系统将根据 Topic 名匹配规则自动创建 Topic。 关闭:手动输入或者选择目标 Topic 名称,后续所有数据将统一写入该 Topic 内。 |
Topic 匹配策略 | 与来源表同名:默认使用与来源表同名的 Topic 。 自定义:根据定义策略规则匹配 Topic 。 |
分区规则 | 配置 topic partition 分区映射(轮询写入分区、根据表名分区、根据来源表主键分区、指定分区、自定义): 轮询写入分区:轮询(Round Robin)上游数据写入到每个 partition。 根据表名写入分区:根据上游数据中的表名hash映射写入每个 partition。 根据来源表主键分区:根据上游数据中的主键数据内容 hash 映射写入每个 partition。 指定分区: 写入指定单分区:输入分区序号,所有消息仅写入到固定分区。 根据表规则写入多分区:支持输入库、表正则进行对象匹配,符合匹配规则的对象写入到指定分区中,规则之间顺序执行,已匹配库表不参与后续规则匹配。 自定义:支持使用 “内置参数” 拼接写入分区规则,设定后将根据分区规则对应的值对消息进行 hash 分区。 |
高级设置(可选) | 可根据业务需求配置参数。 |
{"data": [{"id": "2","name": "scooter33","description": "Big 2-wheel scooter233","weight": "5.11"}],"database": "pacino99","es": 1589373560000,"id": 9,"isDdl": false,"mysqlType": {"id": "INTEGER","name": "VARCHAR(255)","description": "VARCHAR(512)","weight": "FLOAT"},"old": [{"weight": "5.12"}],"pkNames": ["id"],"sql": "","sqlType": {"id": 4,"name": 12,"description": 12,"weight": 7},"table": "products999","ts": 1589373560798,"type": "UPDATE"}
{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory2.customers2.Value","field": "before"},{"type": "struct","fields": [{"type": "int32","optional": false,"field": "id"},{"type": "string","optional": false,"field": "first_name"},{"type": "string","optional": false,"field": "last_name"},{"type": "string","optional": false,"field": "email"}],"optional": true,"name": "mysql-server-1.inventory2.customers2.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "boolean","optional": true,"default": false,"field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "table"},{"type": "int64","optional": false,"field": "server_id"},{"type": "string","optional": true,"field": "gtid"},{"type": "string","optional": false,"field": "file"},{"type": "int64","optional": false,"field": "pos"},{"type": "int32","optional": false,"field": "row"},{"type": "int64","optional": true,"field": "thread"},{"type": "string","optional": true,"field": "query"}],"optional": false,"name": "io.debezium.connector.mysql.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"}],"optional": false,"name": "mysql-server-1.inventory.customers.Envelope"},"payload": {"op": "c","ts_ms": 1465491411815,"before": null,"after": {"id": 12003,"first_name": "Anne322","last_name": "Kretchmar3222","email": "annek@noanswer.org3222"},"source": {"version": "1.9.6.Final","connector": "mysql","name": "mysql-server-1","ts_ms": 0,"snapshot": false,"db": "inventory333","table": "customers433","server_id": 0,"gtid": null,"file": "mysql-bin.000003","pos": 154,"row": 0,"thread": 7,"query": ""}}}
参数 | 描述 |
数据来源 | Kafka 读取端数据源类型支持 Kafka。 |
Topic | Kafka 数据源中的 Topic。 |
序列化格式 | Kafka 消息序列化格式类型,支持:canal-json、ogg-json、json、avro、csv、raw。 |
消息类型 | Append 消息:Kafka 内消息来源于 Append 消息流,通常消息中不携带唯一键。写入节点建议搭配 Append 写入模式。 Upsert 消息:Kafka 内消息来源于 Upsert 消息流,通常消息中携带唯一键,设置后消息可保证 Exactly-Once。写入节点建议搭配 Upsert 写入模式。 |
唯一键 | Upsert 写入模式下,需设置唯一键保证数据有序性 |
读取位置 | 启动同步任务时开始同步数据的起始位点。支持 earliest 和 latest |
消费组 ID | 请避免该参数与其他消费进程重复,以保证消费位点的正确性。如果不指定该参数,默认设定 group.id=WeData_ group_${任务id}。 |
高级设置(可选) | 可根据业务需求配置参数。 |
参数 | 描述 |
数据源 | Kafka 写入端数据源类型支持 Kafka。 |
Topic | Kafka 数据源中的 Topic。 |
序列化格式 | Kafka 消息序列化格式类型,支持:canal-json、json、avro。 |
写入模式 | Upsert:更新写入。当主键不冲突时,可插入新行;当主键冲突时,则进行更新。适用于目标表有主键且需要根据源端数据实时更新的场景。会有一定的性能损耗。 Append:追加写入。无论是否有主键,以插入新行的方式追加写入数据,是否存在主键冲突取决于目标端。适用于无主键且允许数据重复的场景。无性能损耗。 |
唯一键 | Upsert 写入模式下,需设置唯一键保证数据有序性 |
Partition 分区映射 | 配置 topic partition 分区映射(轮询写入分区、根据指定字段内容 Hash 写入分区、指定分区): 轮询写入分区:轮询(Round Robin)上游数据写入到每个 partition。 根据指定字段内容 Hash : 写入分区:根据指定字段内容 Hash 映射写入每个 partition。 指定分区:输入分区序号,所有消息仅写入到固定分区。 |
高级设置(可选) | 可根据业务需求配置参数。 |
参数 | 说明 |
数据源 | 选择当前项目中可用的 Kafka 数据源,Kafka 写入端数据源类型支持 Kafka、Ckafka 。 |
topic | Kafka 数据源中的 Topic。 |
序列化格式 | Kafka 消息序列化格式类型,支持三种类型: canal-json json avro |
写入模式 | Kafka 支持两种写入模式: Append:追加写入。 Upsert:以 upsert 方式插入消息,设置后消息仅只能被消息端处理一次以保证 Exactly-Once。 |
唯一键 | Upsert 写入模式下,需设置唯一键保证数据有序性,支持多选,Append 模式则不需要设置唯一键。 |
高级设置(可选) | 可根据业务需求配置参数。 |
内部类型 | Kafka 类型 |
SMALLINT | SMALLINT, TINYINT UNSIGNED, TINYINT UNSIGNED ZEROFILL |
INTEGER | INT, INTEGER, YEAR, SHORT, MEDIUMINT, SMALLINT UNSIGNED, SMALLINT UNSIGNED ZEROFILL |
BIGINT | BIGINT, INT UNSIGNED, MEDIUMINT UNSIGNED, MEDIUMINT UNSIGNED ZEROFILL, INT UNSIGNED ZEROFILL |
DECIMAL | BIGINT UNSIGNED, BIGINT UNSIGNED ZEROFILL, SERIAL, NUMERIC, NUMERIC UNSIGNED, NUMERIC UNSIGNED ZEROFILL, DECIMAL, DYNAMIC DECIMAL, DECIMAL UNSIGNED, DECIMAL UNSIGNED ZEROFILL, FIXED, FIXED UNSIGNED, FIXED UNSIGNED ZEROFILL |
FLOAT | FLOAT, FLOAT UNSIGNED, FLOAT UNSIGNED ZEROFILL |
DOUBLE | DOUBLE, DOUBLE UNSIGNED, DOUBLE UNSIGNED ZEROFILL, DOUBLE PRECISION, DOUBLE PRECISION UNSIGNED, ZEROFILL, REAL, REAL UNSIGNED, REAL UNSIGNED ZEROFILL |
TIMESTAMP | ATETIME, TIMESTAMP WITH LOCAL TIME ZONE, TIMESTAMP WITH TIME ZONE |
TIMESTAMP_WITH_TIMEZONE | TIMESTAMP, TIMESTAMP WITH LOCAL TIME ZONE, TIMESTAMP WITH TIME ZONE |
BLOB | BLOB, TINYBLOB, MEDIUMBLOB, LONGBLOB |
VARCHAR | JSON, VARCHAR, TEXT, TINYTEXT, MEDIUMTEXT, LONGTEXT |
本页内容是否解决了您的问题?