CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = KafkaSETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic',kafka_group_name = 'group',kafka_format = 'JSONEachRow',kafka_num_consumers = 1,kafka_max_block_size = 65536,kafka_skip_broken_messages = 0,kafka_auto_offset_reset = 'latest';
名称 | 是否必选 | 说明 |
kafka_broker_list | 是 | Kafka 服务的 broker 列表,用逗号分隔,这里建议用 Ip:port, 不要用域名(可能存在 DNS 解析问题)。 |
kafka_topic_list | 是 | Kafka topic,多个 topic 用逗号分隔。 |
kafka_group_name | 是 | Kafka 的消费组名称。 |
kafka_format | 是 | Kafka 数据格式, ClickHouse 支持的 Format, 详见 可选参数。 |
kafka_row_delimiter | 否 | 行分隔符,用于分割不同的数据行。默认为“\\n”,您也可以根据数据写入的实际分割格式进行设置。 |
kafka_num_consumers | 否 | 单个 Kafka Engine 的消费者数量,通过增加该参数,可以提高消费数据吞吐,但总数不应超过对应 topic 的 partitions 总数。 |
kafka_max_block_size | 否 | Kafka 数据写入目标表的 Block 大小,超过该数值后,就将数据刷盘;单位:Byte,默认值为65536 Byte。 |
kafka_skip_broken_messages | 否 | 表示忽略解析异常的 Kafka 数据的条数。如果出现了 N 条异常后,后台线程结束 默认值为0。 |
kafka_commit_every_batch | 否 | 执行 Kafka commit 的频率,取值如下:
0:完全写入一整个Block数据块的数据后才执行commit;
1:每写完一个Batch批次的数据就执行一次commit。 |
kafka_auto_offset_reset | 否 | 从哪个 offset 开始读取 Kafka 数据。取值范围:earlist,latest。 |
CREATE TABLE daily on cluster default_cluster(day Date,level String,total UInt64)engine = SummingMergeTree()order by int_id;
create table daily on cluster default_cluster(day Date,level String,total UInt64)engine = ReplicatedSummingMergeTree('/clickhouse/tables/test/test/{shard}', '{replica}')order by int_id;`
create table daily_dis on cluster default_clusterAS test.testengine = Distributed('default_cluster', 'default', 'daily', rand());
CREATE MATERIALIZED VIEW consumer TO dailyAS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as totalFROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
DETACH TABLE consumer;ATTACH TABLE consumer;
本页内容是否解决了您的问题?