+---------+| Client |+----+----+|+-----------------------------+| FE | || +-----------v------------+ || | | || | Routine Load Job | || | | || +---+--------+--------+--+ || | | | || +---v--+ +---v--+ +---v--+ || | task | | task | | task | || +--+---+ +---+--+ +---+--+ || | | | |+-----------------------------+| | |v v v+---+--+ +--+---+ ++-----+| BE | | BE | | BE |+------+ +------+ +------+
HELP ROUTINE LOAD;
查看语法帮助。
下面我们以几个例子说明如何创建 Routine Load 任务:CREATE ROUTINE LOAD example_db.test1 ON example_tblCOLUMNS TERMINATED BY ",",COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)PROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false")FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","property.group.id" = "xxx","property.client.id" = "xxx","property.kafka_default_offsets" = "OFFSET_BEGINNING");
CREATE ROUTINE LOAD example_db.test1 ON example_tblCOLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),WHERE k1 > 100 and k2 like "%doris%"PROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "true")FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","kafka_partitions" = "0,1,2,3","kafka_offsets" = "101,0,0,200");
{"category":"a9jadhx","author":"test","price":895}
[{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},{"category":"22","author":"2avc","price":895,"timestamp":1589191487},{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}]
CREATE TABLE `example_tbl` (`category` varchar(24) NULL COMMENT "",`author` varchar(24) NULL COMMENT "",`timestamp` bigint(20) NULL COMMENT "",`dt` int(11) NULL COMMENT "",`price` double REPLACE) ENGINE=OLAPAGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)COMMENT "OLAP"PARTITION BY RANGE(`dt`)(PARTITION p0 VALUES [("-2147483648"), ("20200509")),PARTITION p20200509 VALUES [("20200509"), ("20200510")),PARTITION p20200510 VALUES [("20200510"), ("20200511")),PARTITION p20200511 VALUES [("20200511"), ("20200512")))DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4PROPERTIES ("replication_num" = "1");
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1COLUMNS(category,price,author)PROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json")FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","kafka_partitions" = "0,1,2","kafka_offsets" = "0,0,0");
CREATE ROUTINE LOAD example_db.test1 ON example_tblCOLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))PROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json","jsonpaths" = "[\\"$.category\\",\\"$.author\\",\\"$.price\\",\\"$.timestamp\\"]","strip_outer_array" = "true")FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","kafka_partitions" = "0,1,2","kafka_offsets" = "0,0,0");
source data | source data example | string to int | strict_mode | result |
空值 | \\N | N/A | true or false | NULL |
not null | aaa or 2000 | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 | 1 | true or false | correct data |
source data | source data example | string to int | strict_mode | result |
空值 | \\N | N/A | true or false | NULL |
not null | aaa | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 or 10 | 1 | true or false | correct data |
CREAE FILE
命令上传到 Doris 中,**并且 catalog 名称为 kafka
**。CREATE FILE
命令的具体帮助可以参见 HELP CREATE FILE;
。这里给出示例:CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
CREATE ROUTINE LOAD db1.job1 on tbl1PROPERTIES("desired_concurrent_number"="1")FROM KAFKA("kafka_broker_list"= "broker1:9091,broker2:9091","kafka_topic" = "my_topic","property.security.protocol" = "ssl","property.ssl.ca.location" = "FILE:ca.pem","property.ssl.certificate.location" = "FILE:client.pem","property.ssl.key.location" = "FILE:client.key","property.ssl.key.password" = "abcdefg");
HELP SHOW ROUTINE LOAD;
命令查看。HELP SHOW ROUTINE LOAD TASK;
命令查看。STOP/PAUSE/RESUME
三个命令来控制作业的停止,暂停和重启。可以通过 HELP STOP ROUTINE LOAD;``HELP PAUSE ROUTINE LOAD;
以及 HELP RESUME ROUTINE LOAD;
三个命令查看帮助和示例。kafka_topic
在kafka集群中不存在时。auto.create.topics.enable = true
,则 kafka_topic
会先被自动创建,自动创建的 partition 个数是由用户方的kafka集群中的 broker 配置 num.partitions
决定的。例行作业会正常的不断读取该 topic 的数据。auto.create.topics.enable = false
, 则 topic 不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态为 PAUSED
。
所以,如果用户希望当 kafka topic 不存在的时候,被例行作业自动创建的话,只需要将用户方的 kafka 集群中的 broker 设置 auto.create.topics.enable = true
即可。advertised.listeners
, advertised.listeners
中的地址必须能够被 Doris 服务访问。kafka_partitions
:指定待消费的 partition 列表,如:"0, 1, 2, 3"。kafka_offsets
:指定每个分区的起始offset,必须和 kafka_partitions
列表个数对应。如:"1000, 1000, 2000, 2000"property.kafka_default_offset
:指定分区默认的起始offset。组合 | kafka_partitions | kafka_offsets | property.kafka_default_offset | 行为 |
1 | No | No | No | 系统会自动查找topic对应的所有分区并从 OFFSET_END 开始消费 |
2 | No | No | Yes | 系统会自动查找topic对应的所有分区并从 default offset 指定的位置开始消费 |
3 | Yes | No | No | 系统会从指定分区的 OFFSET_END 开始消费 |
4 | Yes | Yes | No | 系统会从指定分区的指定offset 处开始消费 |
5 | Yes | No | Yes | 系统会从指定分区,default offset 指定的位置开始消费 |
1992-01-02 3273383 3 2508983 508984 42 83658.12 0.06 0.07 R F 1992-03-19 1992-01-18 COLLECT COD TRUCK the carefully ironic accounts hin 1992-01-02 26696039 2 8782384 532409 50 73297.5 0.02 0.07 R F 1992-01-31 1992-01-13 DELIVER IN PERSON REG AIR . furiously regul 1992-01-02 47726080 1 5950048 950049 24 26346 0.02 0.02 A F 1992-02-23 1992-01-16 DELIVER IN PERSON RAIL l pearls. spec 1992-01-02 77918949 2 7276493 526501 23 33789.99 0.06 0 R F 1992-03-28 1992-01-26 NONE TRUCK y express requests. fin 1992-01-02 87026306 4 9061545 811573 11 16566.99 0.08 0.06 R F 1992-02-09 1992-01-21 TAKE BACK RETURN AIR ly regular instructions s 1992-01-02 135925030 4 14097984 598013 30 59438.4 0.02 0.08 R F 1992-02-24 1992-01-06 COLLECT COD REG AIR es boost regular platelets. reg 1992-01-02 189122402 2 13648194 398234 5 5707.55 0.1 0.08 R F 1992-03-24 1992-01-06 COLLECT COD FOB aggle caref 1992-01-02 235359552 4 8148971 898996 29 58567.53 0.1 0.07 A F 1992-02-10 1992-02-01 NONE AIR furiously ironic p 1992-01-02 298717351 3 4078182 78183 42 48719.16 0.09 0.07 R F 1992-03-08 1992-01-26 DELIVER IN PERSON MAIL hely regular accounts. blithely i 1992-01-02 305288709 3 14997743 747786 33 60720 0.04 0.06 A F 1992-03-25 1992-01-04 TAKE BACK RETURN TRUCK counts engage across the
CREATE TABLE lineitem_rtload ( l_shipdate date NOT NULL, l_orderkey bigint(20) NOT NULL, l_linenumber int(11) NOT NULL, l_partkey int(11) NOT NULL, l_suppkey int(11) NOT NULL, l_quantity decimalv3(15, 2) NOT NULL, l_extendedprice decimal(15, 2) NOT NULL, l_discount decimalv3(15, 2) NOT NULL, l_tax decimalv3(15, 2) NOT NULL, l_returnflag varchar(1) NOT NULL, l_linestatus varchar(1) NOT NULL, l_commitdate date NOT NULL, l_receiptdate date NOT NULL, l_shipinstruct varchar(25) NOT NULL, l_shipmode varchar(10) NOT NULL, l_comment varchar(44) NOT NULL ) ENGINE=OLAP DUPLICATE KEY(l_shipdate, l_orderkey) COMMENT 'OLAP' DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" );
CREATE ROUTINE LOAD tpch_100_d.rtl_20230809 ON lineitem_rtload COLUMNS TERMINATED BY "\\t" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "max_error_number" = "100", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "10.0.1.138:9092", "kafka_topic" = "doris_routine_load_test", "property.group.id" = "routine_test", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
MySQL [tpch_100_d]> show routine load\\G; *************************** 1. row *************************** Id: 21619 Name: rtl_20230809 CreateTime: 2023-08-09 19:17:16 PauseTime: NULL EndTime: NULL DbName: default_cluster:tpch_100_d TableName: lineitem_rtload State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 3 JobProperties: {"timezone":"Asia/Shanghai","send_batch_parallelism":"1","columnSeparator":"'\\t'","load_to_single_tablet":"false","lineDelimiter":"\\n","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"true","jsonpaths":"","currentTaskConcurrentNum":"3","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"","maxBatchIntervalS":"20","whereExpr":"","precedingFilter":"","mergeType":"APPEND","format":"csv","json_root":"","deleteCondition":"","desireTaskConcurrentNum":"3","maxErrorNum":"100","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"} DataSourceProperties: {"topic":"doris_routine_load_test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.1.138:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ryanzryu_routine_test"} Statistic: {"receivedBytes":568128,"runningTxns":[],"errorRows":0,"committedTaskNum":31,"loadedRows":4400,"loadRowsRate":7,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":4400,"unselectedRows":0,"receivedBytesRate":905,"taskExecuteTimeMs":627757} Progress: {"0":"1599","1":"1316","2":"1482"} Lag: {"0":0,"1":0,"2":0} ReasonOfStateChanged: ErrorLogUrls: OtherMsg:
Id: 作业IDName: 作业名称CreateTime: 作业创建时间PauseTime: 最近一次作业暂停时间EndTime: 作业结束时间DbName: 对应数据库名称TableName: 对应表名称State: 作业运行状态DataSourceType: 数据源类型:KAFKACurrentTaskNum: 当前子任务数量JobProperties: 作业配置详情DataSourceProperties: 数据源配置详情 CustomProperties: 自定义配置 Statistic: 作业运行状态统计信息 Progress: 作业运行进度 Lag: 作业延迟状态 ReasonOfStateChanged: 作业状态变更的原因 ErrorLogUrls: 被过滤的质量不合格的数据的查看地址 OtherMsg: 其他错误信息
Progress: {"0":"2061","1":"2135","2":"2254"} Progress: {"0":"2279","1":"2293","2":"2321"}
ALTER ROUTINE LOAD FOR [db.]job_name [job_properties] FROM data_source [data_source_properties]
PAUSE [ALL] ROUTINE LOAD FOR job_name
MySQL [tpch_100_d]> PAUSE ROUTINE LOAD FOR rtl_20230809; Query OK, 0 rows affected (0.00 sec) MySQL [tpch_100_d]> show ROUTINE LOAD FOR rtl_20230809\\G; *************************** 1. row *************************** Id: 21619 Name: rtl_20230809 CreateTime: 2023-08-09 19:17:16 PauseTime: 2023-08-09 21:03:21 EndTime: NULL DbName: default_cluster:tpch_100_d TableName: lineitem_rtload State: PAUSED DataSourceType: KAFKA CurrentTaskNum: 0 JobProperties: {"timezone":"Asia/Shanghai","send_batch_parallelism":"1","columnSeparator":"'\\t'","load_to_single_tablet":"false","lineDelimiter":"\\n","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"true","jsonpaths":"","currentTaskConcurrentNum":"3","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"","maxBatchIntervalS":"20","whereExpr":"","precedingFilter":"","mergeType":"APPEND","format":"csv","json_root":"","deleteCondition":"","desireTaskConcurrentNum":"3","maxErrorNum":"100","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"} DataSourceProperties: {"topic":"doris_routine_load_test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.1.138:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ryanzryu_routine_test"} Statistic: {"receivedBytes":1123678,"runningTxns":[],"errorRows":0,"committedTaskNum":114,"loadedRows":8703,"loadRowsRate":3,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":8703,"unselectedRows":0,"receivedBytesRate":486,"taskExecuteTimeMs":2310792} Progress: {"0":"2917","1":"2754","2":"3029"} Lag: {"0":0,"1":0,"2":0} ReasonOfStateChanged: ErrorReason{code=errCode = 100, msg='User root pauses routine load job'} ErrorLogUrls: OtherMsg: 1 row in set (0.00 sec)
ALTER ROUTINE LOAD FOR tpch_100_d.rtl_20230809 PROPERTIES ( "max_batch_interval" = "10" );
RESUME ROUTINE LOAD FOR rtl_20230809;
HELP ROUTINE LOAD
获取更多帮助信息。
本页内容是否解决了您的问题?