CREATE TABLE queue (timestamp UInt64,level String,message String) ENGINE = KafkaSETTINGSkafka_broker_list = 'localhost:9092',kafka_topic_list = 'topic',kafka_group_name = 'group',kafka_format = 'JSONEachRow',kafka_num_consumers = 1,kafka_max_block_size = 65536,kafka_skip_broken_messages = 0,kafka_auto_offset_reset = 'latest';
Name | Required | Description |
kafka_broker_list | Yes | A list of Kafka service brokers separated by commas. We recommend you use `Ip:port` instead of domain name to avoid possible DNS resolution issues. |
kafka_topic_list | Yes | Kafka topics separated by commas |
kafka_group_name | Yes | Kafka consumption group name |
kafka_format | Yes | Kafka data format. For information on ClickHouse-supported formats, see the parameters in |
kafka_row_delimiter | No | Row delimiter used to split data rows. The default value is `\\n`, but you can also set it to another value according to the actual segmentation format during data write. |
kafka_num_consumers | No | Number of consumers for a single Kafka engine. You can increase the consumption data throughput by increasing this parameter value, but it cannot exceed the total number of partitions in the corresponding topic. |
kafka_max_block_size | No | Block size of the target table to which Kafka data is written. It is 65536 bytes by default. If the data size exceeds this value, the data will be flushed. |
kafka_skip_broken_messages | No | Number of data records with parsing exceptions that can be ignored. If the number of exceptions exceeds the specified value (`N`), the backend thread will stop. The default value is 0. |
kafka_commit_every_batch | No | Frequency of Kafka commit execution.
0: commits only after the data of an entire block is written.
1: commits after the data of each batch is written. |
kafka_auto_offset_reset | No | The offset from which to read Kafka data. Its value can be `earliest` or `latest`. |
CREATE TABLE daily on cluster default_cluster(day Date,level String,total UInt64)engine = SummingMergeTree()order by int_id;
create table daily on cluster default_cluster(day Date,level String,total UInt64)engine = ReplicatedSummingMergeTree('/clickhouse/tables/test/test/{shard}', '{replica}')order by int_id;`
create table daily_dis on cluster default_clusterAS test.testengine = Distributed('default_cluster', 'default', 'daily', rand());
CREATE MATERIALIZED VIEW consumer TO dailyAS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as totalFROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
detach
and attach
view operations.DETACH TABLE consumer;ATTACH TABLE consumer;
Was this page helpful?