Flink 版本 | 说明 |
1.11 | 支持 Sink |
1.13 | 支持 Source 和 Sink |
1.14 | 支持 Source 和 Sink |
1.16 | 支持 Source 和 Sink |
CREATE TABLE clickhouse_sink_table (`id` INT,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'clickhouse', -- 指定使用clickhouse连接器'url' = 'clickhouse://172.28.28.160:8123', -- 指定集群地址,可以通过ClickHouse集群界面查看-- 如果ClickHouse集群未配置账号密码可以不指定--'username' = 'root', -- ClickHouse集群用户名--'password' = 'root', -- ClickHouse集群的密码'database-name' = 'db', -- 数据写入目的数据库'table-name' = 'table', -- 数据写入目的数据表'sink.batch-size' = '1000' -- 触发批量写的条数);
CREATE TABLE clickhouse_upsert_sink_table (`id` INT,`name` STRING,PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义) WITH (-- 指定数据库连接参数'connector' = 'clickhouse', -- 指定使用clickhouse连接器'url' = 'clickhouse://172.28.28.160:8123', -- 指定集群地址,可以通过ClickHouse集群界面查看-- 如果ClickHouse集群未配置账号密码可以不指定--'username' = 'root', -- ClickHouse集群用户名--'password' = 'root', -- ClickHouse集群的密码'database-name' = 'db', -- 数据写入目的数据库'table-name' = 'table', -- 数据写入目的数据表'table.collapsing.field' = 'Sign', -- CollapsingMergeTree 类型列字段的名称'sink.batch-size' = '1000' -- 触发批量写的条数);
CREATE TABLE `clickhosue_batch_source` (`when` TIMESTAMP,`userid` BIGINT,`bytes` FLOAT) WITH ('connector' = 'clickhouse','url' = 'clickhouse://172.28.1.21:8123','database-name' = 'dts','table-name' = 'download_dist'-- 'scan.by-part.enabled' = 'false', -- 是否启用读 ClickHouse 表 part。若启用,必须先在所有节点上使用命令 'STOP MERGES' 和 'STOP TTL MERGES' 停止表的后台 merge 和基于 TTL 的数据删除操作,否则读取的数据会不正确。-- 'scan.part.modification-time.lower-bound' = '2021-09-24 16:00:00', -- 用于根据 modification_time 过滤 ClickHouse 表 part 的最小时间(包含),格式 yyyy-MM-dd HH:mm:ss。-- 'scan.part.modification-time.upper-bound' = '2021-09-17 19:16:26', -- 用于根据 modification_time 过滤 ClickHouse 表 part 的最大时间(不包含),格式 yyyy-MM-dd HH:mm:ss。-- 'local.read-write' = 'false', -- 是否读本地表,默认 false 。-- 'table.local-nodes' = '172.28.1.24:8123,172.28.1.226:8123,172.28.1.109:8123,172.28.1.36:8123' -- local node 列表,需要使用 http port。注意一个 shard 只能配置一个 replica 节点地址,否则会读取到重复数据。);
CREATE TABLE `clickhouse_dimension` (`userid` BIGINT,`comment` STRING) WITH ('connector' = 'clickhouse','url' = 'clickhouse://172.28.1.21:8123','database-name' = 'dimension','table-name' = 'download_dist','lookup.cache.max-rows' = '500', -- 查询缓存(Lookup Cache)中最多缓存的数据条数。'lookup.cache.ttl' = '10min', -- 查询缓存中每条记录最长的缓存时间。'lookup.max-retries' = '10' -- 数据库查询失败时,最多重试的次数。);
参数值 | 必填 | 默认值 | 描述 |
connector | 是 | - | 当要使用 ClickHouse 作为数据目的(Sink)需要填写,取值 clickhouse 。 |
url | 是 | - | ClickHouse 集群连接 url,可以通过集群界面查看,举例 'clickhouse://127.1.1.1:8123'。 |
username | 否 | - | ClickHouse 集群用户名。 |
password | 否 | - | ClickHouse 集群密码。 |
database-name | 是 | - | ClickHouse 集群数据库。 |
table-name | 是 | - | ClickHouse 集群数据表。 |
sink.batch-size | 否 | 1000 | Connector batch 写入的条数。 |
sink.flush-interval | 否 | 1000 (单位 ms) | Connector 异步线程刷新写入 ClickHouse 间隔。 |
table.collapsing.field | 否 | - | CollapsingMergeTree 类型列字段的名称。 |
sink.max-retries | 否 | 3 | 写入失败时的重试次数。 |
local.read-write | 否 | false | 是否启用直写本地表功能。默认不开启。 注意:该功能仅供高级用户使用,需配合下面几个参数使用。详见本文后续的 ”写入本地表“ 章节。 |
table.local-nodes | 否 | - | 启用写本地表( local.read-write 为 true )时,local node 列表,举例 '127.1.1.10:8123,127.1.2.13:8123'(需要使用 http port)。 |
sink.partition-strategy | 否 | balanced | 启用写本地表( local.read-write 为 true )时,需要设置数据分发策略,支持 balanced/shuffle/hash。如果希望实现数据的动态更新,且表引擎使用 CollapsingMergeTree,则取值必须为 hash ,且需要配合 sink.partition-key 一同使用。取值说明: balanced 轮询模式写入,shuffle 随机挑选节点写入, hash 根据 sink.partition-key 的 hash 值,选择节点写入。 |
sink.partition-key | 否 | - | 启用写本地表( local.read-write 为 true ),且 sink.partition-strategy 为 hash 时需要设置,值为所定义表中的主键。如果主键包含多个字段,则需要指定为第一个字段。 |
sink.ignore-delete | 否 | false | 启用该参数后,会过滤所有向 ClickHouse 写入的 DELETE(删除)消息。该选项适用于使用 ReplacingMergeTree 表引擎,并期望实现数据的动态更新的场景。 |
sink.backpressure-aware | 否 | false | 当 Flink 日志频繁出现 "Too many parts" 报错,且作业因此崩溃时,启用该参数可以大幅减轻服务端负载,提升整体的吞吐量和稳定性。 |
sink.reduce-batch-by-key | 否 | false | 开启该参数后,对于定义了主键的 ClickHouse Sink 表,在给定的刷新周期内,同 Key 的数据会做归并,只取最后一条。 |
sink.max-partitions-per-insert | 否 | 20 | 当clickhouse是分区表,且分区函数CK内置为intHash32、toYYYYMM 或toYYYYMMDD 之一时,Flink写入Clickhouse会通过预先在sink端按分区攒数据buffer,当攒的分区数目到达设定值时会触发往下游clickhouse写入(如果 sink.flush-interval 和sink.batch-size 先到的话也会先触发写入),极大的提高写入clickhouse的吞吐效率。设置为-1时会关闭分区聚合写入功能。 |
scan.fetch-size | 否 | 100 | 每次从数据库读取时,批量获取的行数。 |
scan.by-part.enabled | 否 | false | 是否启用读 ClickHouse 表 part。若启用,必须先在所有节点上使用命令'STOP MERGES'和'STOP TTL MERGES'停止表的后台 merge 和基于 TTL 的数据删除操作,否则读取的数据会不正确。 |
scan.part.modification-time.lower-bound | 否 | - | 用于根据 modification_time 过滤 ClickHouse 表 part 的最小时间(包含),格式 yyyy-MM-dd HH:mm:ss。 |
scan.part.modification-time.upper-bound | 否 | - | 用于根据 modification_time 过滤 ClickHouse 表 part 的最大时间(不包含),格式 yyyy-MM-dd HH:mm:ss。 |
lookup.cache.max-rows | 否 | 无 | 查询缓存(Lookup Cache)中最多缓存的数据条数。 |
lookup.cache.ttl | 否 | 无 | 查询缓存中每条记录最长的缓存时间。 |
lookup.max-retries | 否 | 3 | 数据库查询失败时,最多重试的次数。 |
basic
,只能解析 YYYY-MM-DD HH:MM:SS
或者 YYYY-MM-DD
格式的时间,如果您的作业出现时间解析异常(例如 java.sql.SQLException: Code: 6. DB::Exception: Cannot parse string '2023-05-24 14:34:55.166' as DateTime
),请参考下表调整 Flink 中对应的数据类型为 TIMESTAMP(0)
,或者调整 ClickHouse 集群的 date_time_input_format 值为 best_effort
。此外 ClickHouse 支持以整数格式插入 DateTime 数据,因此您也可以在 Flink 中映射类型为 INTEGER,但不推荐。2023-01-01 11:11:11.000
)。为了避免相关问题,建议参考下表映射为 TIMESTAMP 数据类型。ClickHouse 数据类型 | Flink 数据类型 | Java 数据类型 |
String | VARCHAR/STRING | String |
FixedString(N) | VARCHAR/STRING | String |
Bool | BOOLEAN | Byte |
Int8 | TINYINT | Byte |
UInt8 | SMALLINT | Short |
Int16 | SMALLINT | Short |
UInt16 | INTEGER | Integer |
Int32 | INTEGER | Integer |
UInt32 | BIGINT | Long |
Int64 | BIGINT | Long |
UInt64 | BIGINT | Long |
Int128 | DECIMAL | BigInteger |
UInt128 | DECIMAL | BigInteger |
Int256 | DECIMAL | BigInteger |
UInt256 | DECIMAL | BigInteger |
Float32 | FLOAT | Float |
Float64 | DOUBLE | Double |
Decimal(P,S)/Decimal32(S)/Decimal64(S)/Decimal128(S)/Decimal256(S) | DECIMAL | BigDecimal |
Date | DATE | LocalDateTime |
DateTime([timezone]) | TIMESTAMP(0) | LocalDateTime |
DateTime64(precision, [timezone]) | TIMESTAMP(precision) | LocalDateTime |
Array(T) | ARRAY<T> | T[] |
Map(K, V) | MAP<K, V> | Map<?, ?> |
Tuple(T1, T2, ...) | ROW<f1 T1, f2 T2, ...> | List<Object> |
CREATE TABLE datagen_source_table (id INT,name STRING) WITH ('connector' = 'datagen','rows-per-second'='1' -- 每秒产生的数据条数);CREATE TABLE clickhouse_sink_table (`id` INT,`name` STRING) WITH (-- 指定数据库连接参数'connector' = 'clickhouse', -- 指定使用clickhouse连接器'url' = 'clickhouse://172.28.28.160:8123', -- 指定集群地址,可以通过ClickHouse集群界面查看-- 如果ClickHouse集群未配置账号密码可以不指定--'username' = 'root', -- ClickHouse集群用户名--'password' = 'root', -- ClickHouse集群的密码'database-name' = 'db', -- 数据写入目的数据库'table-name' = 'table', -- 数据写入目的数据表'sink.batch-size' = '1000' -- 触发批量写的条数);insert into clickhouse_sink_table select * from datagen_source_table;
sink.ignore-delete
参数为 true
。此后 Flink 会自动过滤所有的删除(DELETE)消息,并将插入(INSERT)和更新(UPDATE_AFTER)消息则统一转为插入(INSERT)消息。随后,ClickHouse 底层会自动使用最新写入的记录来覆盖之前同主键的旧记录,从而实现数据的更新。table.collapsing.field
参数来指定 Sign 字段。它的原理是通过发送内容相同,但符号(Sign)相反的消息,来实现旧数据的删除(抵消),以及新数据的插入。ReplicatedMergeTree
的自动去重可能会使得短期内多次写入到 ClickHouse 的数据被判断为重复数据,导致数据丢失。此时,可在建表(或者修改表)时,指定 replicated_deduplication_window=0
,以关闭自动去重功能。CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;
ENGINE = Distributed(cluster_name, database_name, table_name[, sharding_key]);
中 sharding_key 需为 Flink SQL 的 sink 表中的主键(Primary Key),以保证同一个 Primary Key 的记录写入到同一个节点中。local.read-write
参数设置为 true
,则 Flink 可以直接写入本地表。table.collapsing.field
参数来指定 Sign 字段,并设置 sink.partition-strategy
为 hash
以令相同主键的数据落在同一个 shard 上,并将 sink.partition-key
参数设置为主键字段(对于多字段的混合主键,则设置为第一个字段)。CREATE TABLE testdb.testtable on cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/testtable', '{replica}', Sign) ORDER BY id ;
sink.max-partitions-per-insert
参数。-- 创建数据库CREATE DATABASE test ON cluster default_cluster;-- 创建本地表CREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64),`Sign` Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}', Sign) ORDER BY id SETTINGS replicated_deduplication_window = 0;-- 基于本地表,创建分布式表CREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);
-- 创建数据库CREATE DATABASE test ON cluster default_cluster;-- 创建本地表CREATE TABLE test.datagen ON cluster default_cluster (`id` Int32,`name` Nullable(String),`age` Nullable(Int32),`weight` Nullable(Float64) ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/test/datagen', '{replica}') ORDER BY id SETTINGS replicated_deduplication_window = 0;-- 基于本地表,创建分布式表CREATE TABLE test.datagen_all ON CLUSTER default_cluster AS test.datagen ENGINE = Distributed(default_cluster, test, datagen, id);
本页内容是否解决了您的问题?