+---------+| Client |+----+----+|+-----------------------------+| FE | || +-----------v------------+ || | | || | Routine Load Job | || | | || +---+--------+--------+--+ || | | | || +---v--+ +---v--+ +---v--+ || | task | | task | | task | || +--+---+ +---+--+ +---+--+ || | | | |+-----------------------------+| | |v v v+---+--+ +--+---+ ++-----+| BE | | BE | | BE |+------+ +------+ +------+
HELP ROUTINE LOAD;
for syntax help.
Below, we will illustrate how to create a routine load task with several examples:CREATE ROUTINE LOAD example_db.test1 ON example_tblCOLUMNS TERMINATED BY ",",COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)PROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false")FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","property.group.id" = "xxx","property.client.id" = "xxx","property.kafka_default_offsets" = "OFFSET_BEGINNING");
CREATE ROUTINE LOAD example_db.test1 ON example_tblCOLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),WHERE k1 > 100 and k2 like "%doris%"PROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "true")FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","kafka_partitions" = "0,1,2,3","kafka_offsets" = "101,0,0,200");
{"category":"a9jadhx","author":"test","price":895}
[{"category":"11","title":"SayingsoftheCentury","price":895,"timestamp":1589191587},{"category":"22","author":"2avc","price":895,"timestamp":1589191487},{"category":"33","author":"3avc","title":"SayingsoftheCentury","timestamp":1589191387}]
CREATE TABLE `example_tbl` (`category` varchar(24) NULL COMMENT "",`author` varchar(24) NULL COMMENT "",`timestamp` bigint(20) NULL COMMENT "",`dt` int(11) NULL COMMENT "",`price` double REPLACE) ENGINE=OLAPAGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)COMMENT "OLAP"PARTITION BY RANGE(`dt`)(PARTITION p0 VALUES [("-2147483648"), ("20200509")),PARTITION p20200509 VALUES [("20200509"), ("20200510")),PARTITION p20200510 VALUES [("20200510"), ("20200511")),PARTITION p20200511 VALUES [("20200511"), ("20200512")))DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4PROPERTIES ("replication_num" = "1");
CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1COLUMNS(category,price,author)PROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json")FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","kafka_partitions" = "0,1,2","kafka_offsets" = "0,0,0");
CREATE ROUTINE LOAD example_db.test1 ON example_tblCOLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))PROPERTIES("desired_concurrent_number"="3","max_batch_interval" = "20","max_batch_rows" = "300000","max_batch_size" = "209715200","strict_mode" = "false","format" = "json","jsonpaths" = "[\\"$.category\\",\\"$.author\\",\\"$.price\\",\\"$.timestamp\\"]","strip_outer_array" = "true")FROM KAFKA("kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092","kafka_topic" = "my_topic","kafka_partitions" = "0,1,2","kafka_offsets" = "0,0,0");
source data | source data example | string to int | strict_mode | result |
Null value | \\N | N/A | true or false | NULL |
not null | aaa or 2000 | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 | 1 | true or false | correct data |
source data | source data example | string to int | strict_mode | result |
Null value | \\N | N/A | true or false | NULL |
not null | aaa | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 or 10 | 1 | true or false | correct data |
kafka
** in advance via CREATE FILE
command. You can see HELP CREATE FILE;
for specific help by enteringCREATE FILE
command. Here is an example:CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
CREATE ROUTINE LOAD db1.job1 on tbl1PROPERTIES("desired_concurrent_number"="1")FROM KAFKA("kafka_broker_list"= "broker1:9091,broker2:9091","kafka_topic" = "my_topic","property.security.protocol" = "ssl","property.ssl.ca.location" = "FILE:ca.pem","property.ssl.certificate.location" = "FILE:client.pem","property.ssl.key.location" = "FILE:client.key","property.ssl.key.password" = "abcdefg");
librdkafka
. For more information on parameters supported by librdkafka
, see Configuration properties.HELP SHOW ROUTINE LOAD;
command.HELP SHOW ROUTINE LOAD TASK;
command.HELP ALTER ROUTINE LOAD;
command or see ALTER ROUTINE LOAD.STOP/PAUSE/RESUME
three commands. Help and examples can be viewed through the HELP STOP ROUTINE LOAD;``HELP PAUSE ROUTINE LOAD;
and HELP RESUME ROUTINE LOAD;
three commands.kafka_topic
declared by the user in the create routine import statement does not exist in the Kafka cluster.auto.create.topics.enable = true
set, then kafka_topic
will be automatically created first, and the number of partitions created automatically is determined by the configuration of the broker num.partitions
in the user's Kafka cluster. The routine Job will keep reading data from this topic as normal.auto.create.topics.enable = false
set, then the topic will not be automatically created, and the routine Job will be paused before any data is read, with a status of PAUSED
.
Therefore, if you want the kafka topic to be automatically created by the routine when it doesn't exist, just set auto.create.topics.enable = true
for the broker in the user's Kafka cluster.advertised.listeners
is configured, the address in advertised.listeners
must be accessible to the Doris service.kafka_partitions
: Specify the partition list to be consumed, such as: "0, 1, 2, 3".kafka_offsets
: Specify the starting offset for each partition, which must correspond to the number of kafka_partitions
. E.g. "1000, 1000, 2000, 2000"property.kafka_default_offset
: Specify the default starting offset of the partition.Combination | kafka_partitions | kafka_offsets | property.kafka_default_offset | Behavior |
1 | No | No | No | The system will automatically find all partitions corresponding to the topic and start consuming them from OFFSET_END |
2 | No | No | Yes | The system will automatically find all partitions of the topic and start consuming from the position specified by default offset |
3 | Yes | No | No | The system will start consuming from OFFSET_END of the specified partition |
4 | Yes | Yes | No | The system will start consuming from a specified offset of the specified partition |
5 | Yes | No | Yes | The system will start consuming from the position specified by default offset of the specified partition |
1992-01-02 3273383 3 2508983 508984 42 83658.12 0.06 0.07 R F 1992-03-19 1992-01-18 COLLECT COD TRUCK the carefully ironic accounts hin 1992-01-02 26696039 2 8782384 532409 50 73297.5 0.02 0.07 R F 1992-01-31 1992-01-13 DELIVER IN PERSON REG AIR . furiously regul 1992-01-02 47726080 1 5950048 950049 24 26346 0.02 0.02 A F 1992-02-23 1992-01-16 DELIVER IN PERSON RAIL l pearls. spec 1992-01-02 77918949 2 7276493 526501 23 33789.99 0.06 0 R F 1992-03-28 1992-01-26 NONE TRUCK y express requests. fin 1992-01-02 87026306 4 9061545 811573 11 16566.99 0.08 0.06 R F 1992-02-09 1992-01-21 TAKE BACK RETURN AIR ly regular instructions s 1992-01-02 135925030 4 14097984 598013 30 59438.4 0.02 0.08 R F 1992-02-24 1992-01-06 COLLECT COD REG AIR es boost regular platelets. reg 1992-01-02 189122402 2 13648194 398234 5 5707.55 0.1 0.08 R F 1992-03-24 1992-01-06 COLLECT COD FOB aggle caref 1992-01-02 235359552 4 8148971 898996 29 58567.53 0.1 0.07 A F 1992-02-10 1992-02-01 NONE AIR furiously ironic p 1992-01-02 298717351 3 4078182 78183 42 48719.16 0.09 0.07 R F 1992-03-08 1992-01-26 DELIVER IN PERSON MAIL hely regular accounts. blithely i 1992-01-02 305288709 3 14997743 747786 33 60720 0.04 0.06 A F 1992-03-25 1992-01-04 TAKE BACK RETURN TRUCK counts engage across the
CREATE TABLE lineitem_rtload ( l_shipdate date NOT NULL, l_orderkey bigint(20) NOT NULL, l_linenumber int(11) NOT NULL, l_partkey int(11) NOT NULL, l_suppkey int(11) NOT NULL, l_quantity decimalv3(15, 2) NOT NULL, l_extendedprice decimal(15, 2) NOT NULL, l_discount decimalv3(15, 2) NOT NULL, l_tax decimalv3(15, 2) NOT NULL, l_returnflag varchar(1) NOT NULL, l_linestatus varchar(1) NOT NULL, l_commitdate date NOT NULL, l_receiptdate date NOT NULL, l_shipinstruct varchar(25) NOT NULL, l_shipmode varchar(10) NOT NULL, l_comment varchar(44) NOT NULL ) ENGINE=OLAP DUPLICATE KEY(l_shipdate, l_orderkey) COMMENT 'OLAP' DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" );
CREATE ROUTINE LOAD tpch_100_d.rtl_20230809 ON lineitem_rtload COLUMNS TERMINATED BY "\\t" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "max_error_number" = "100", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "10.0.1.138:9092", "kafka_topic" = "doris_routine_load_test", "property.group.id" = "routine_test", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
MySQL [tpch_100_d]> show routine load\\G; *************************** 1. row *************************** Id: 21619 Name: rtl_20230809 CreateTime: 2023-08-09 19:17:16 PauseTime: NULL EndTime: NULL DbName: default_cluster:tpch_100_d TableName: lineitem_rtload State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 3 JobProperties: {"timezone":"Asia/Shanghai","send_batch_parallelism":"1","columnSeparator":"'\\t'","load_to_single_tablet":"false","lineDelimiter":"\\n","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"true","jsonpaths":"","currentTaskConcurrentNum":"3","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"","maxBatchIntervalS":"20","whereExpr":"","precedingFilter":"","mergeType":"APPEND","format":"csv","json_root":"","deleteCondition":"","desireTaskConcurrentNum":"3","maxErrorNum":"100","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"} DataSourceProperties: {"topic":"doris_routine_load_test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.1.138:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ryanzryu_routine_test"} Statistic: {"receivedBytes":568128,"runningTxns":[],"errorRows":0,"committedTaskNum":31,"loadedRows":4400,"loadRowsRate":7,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":4400,"unselectedRows":0,"receivedBytesRate":905,"taskExecuteTimeMs":627757} Progress: {"0":"1599","1":"1316","2":"1482"} Lag: {"0":0,"1":0,"2":0} ReasonOfStateChanged: ErrorLogUrls: OtherMsg:
Id: Job IDName: Job nameCreateTime: Job creation timePauseTime: Recent pause time of jobEndTime: Job end timeDbName: Corresponding database nameTableName: Corresponding table nameState: Job running statusDataSourceType: Data source type: KAFKACurrentTaskNum: Current number of subtasksJobProperties: Job configuration detailsDataSourceProperties: Data source configuration details CustomProperties: Custom configuration Statistic: Job execution status statistics Progress: Job progress Lag: Job delay status ReasonOfStateChanged: Reasons for job status changes ErrorLogUrls: View address for filtered, and substandard data OtherMsg: Other error information
Progress: {"0":"2061","1":"2135","2":"2254"} Progress: {"0":"2279","1":"2293","2":"2321"}
ALTER ROUTINE LOAD FOR [db.]job_name [job_properties] FROM data_source [data_source_properties]
PAUSE [ALL] ROUTINE LOAD FOR job_name
MySQL [tpch_100_d]> PAUSE ROUTINE LOAD FOR rtl_20230809; Query OK, 0 rows affected (0.00 sec) MySQL [tpch_100_d]> show ROUTINE LOAD FOR rtl_20230809\\G; *************************** 1. row *************************** Id: 21619 Name: rtl_20230809 CreateTime: 2023-08-09 19:17:16 PauseTime: 2023-08-09 21:03:21 EndTime: NULL DbName: default_cluster:tpch_100_d TableName: lineitem_rtload State: PAUSED DataSourceType: KAFKA CurrentTaskNum: 0 JobProperties: {"timezone":"Asia/Shanghai","send_batch_parallelism":"1","columnSeparator":"'\\t'","load_to_single_tablet":"false","lineDelimiter":"\\n","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"true","jsonpaths":"","currentTaskConcurrentNum":"3","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"","maxBatchIntervalS":"20","whereExpr":"","precedingFilter":"","mergeType":"APPEND","format":"csv","json_root":"","deleteCondition":"","desireTaskConcurrentNum":"3","maxErrorNum":"100","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"} DataSourceProperties: {"topic":"doris_routine_load_test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.1.138:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ryanzryu_routine_test"} Statistic: {"receivedBytes":1123678,"runningTxns":[],"errorRows":0,"committedTaskNum":114,"loadedRows":8703,"loadRowsRate":3,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":8703,"unselectedRows":0,"receivedBytesRate":486,"taskExecuteTimeMs":2310792} Progress: {"0":"2917","1":"2754","2":"3029"} Lag: {"0":0,"1":0,"2":0} ReasonOfStateChanged: ErrorReason{code=errCode = 100, msg='User root pauses routine load job'} ErrorLogUrls: OtherMsg: 1 row in set (0.00 sec)
ALTER ROUTINE LOAD FOR tpch_100_d.rtl_20230809 PROPERTIES ( "max_batch_interval" = "10" );
RESUME ROUTINE LOAD FOR rtl_20230809;
HELP ROUTINE LOAD
command in the Mysql client.
Was this page helpful?