tencent cloud

Feedback

Routine Load (Kafka Data)

Last updated: 2024-06-27 10:56:42
    The Routine Load feature supports users to submit a regular import task and import the data into Doris by reading data continuously from the specified data source, such as importing data from Kafka automatically and continuously. This document mainly introduces the implementation principle, usage method, and best practices of this feature.

    Basic Principles

    +---------+
    | Client |
    +----+----+
    |
    +-----------------------------+
    | FE | |
    | +-----------v------------+ |
    | | | |
    | | Routine Load Job | |
    | | | |
    | +---+--------+--------+--+ |
    | | | | |
    | +---v--+ +---v--+ +---v--+ |
    | | task | | task | | task | |
    | +--+---+ +---+--+ +---+--+ |
    | | | | |
    +-----------------------------+
    | | |
    v v v
    +---+--+ +--+---+ ++-----+
    | BE | | BE | | BE |
    +------+ +------+ +------+
    As shown in the figure above, the client submits a routine load job to the FE.
    1. FE splits an import job into several tasks through the JobScheduler. Each task is responsible for importing a specified part of the data. Tasks are allocated to a specified BE for execution by the TaskScheduler.
    2. On the BE, a task is regarded as an ordinary import task and is imported through the stream load import mechanism. After the import is completed, it reports to the FE.
    3. The JobScheduler in the FE generates subsequent new tasks based on the report results or retries the failed tasks.
    4. The entire routine load job completes the uninterrupted data import by continuously generating new tasks.

    Kafka Routine Import

    At present, we only support routine import from Kafka. This part details the use and best practices of Kafka routine import.

    Use Limits

    1. Supports unauthenticated Kafka access and Kafka clusters certified through SSL.
    2. The supported message formats are CSV and JSON formats. Each CSV message is a line, and the end of the linedoes not containa line break.
    3. Kafka 0.10.0.0 (inclusive) or above is supported by default. If you want to use Kafka versions below 0.10.0.0 (0.9.0, 0.8.2, 0.8.1, 0.8.0), you need to modify the BE configuration to set the value of kafka_broker_version_fallback to be compatible with the old version, or set the property.broker.version.fallback value to be compatible with the old version when creating the routine load. The cost of the old version is that some of the new features of routine load may not be available, such as setting the offset of the kafka partition by time.

    Creating Task

    The detailed syntax for creating a routine import task can be viewed in CREATE ROUTINE LOAD command manual after connecting to Doris, or execute HELP ROUTINE LOAD; for syntax help. Below, we will illustrate how to create a routine load task with several examples:
    1. Create a Kafka routine import task named test1 for example_tbl of example_db. Specify the column separator and group.id and client.id, and automatically consume all partitions by default and subscribe from the location where data is available (OFFSET_BEGINNING).
    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    COLUMNS TERMINATED BY ",",
    COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
    PROPERTIES
    (
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "property.group.id" = "xxx",
    "property.client.id" = "xxx",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
    2. Create a Kafka routine import task named test1 with a strict mode for example_tbl of example_db.
    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100),
    WHERE k1 > 100 and k2 like "%doris%"
    PROPERTIES
    (
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "true"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "kafka_partitions" = "0,1,2,3",
    "kafka_offsets" = "101,0,0,200"
    );
    
    3. Example of importing data in Json format: Routine load only supports the following two types of Json format: The first one has only one record and is a Json object:
    {"category":"a9jadhx","author":"test","price":895}
    The second type is a Json array that can contain multiple records:
    [
    {"category":"11",
    "title":"SayingsoftheCentury",
    "price":895,
    "timestamp":1589191587
    },
    {
    "category":"22",
    "author":"2avc",
    "price":895,
    "timestamp":1589191487
    },
    {
    "category":"33",
    "author":"3avc",
    "title":"SayingsoftheCentury",
    "timestamp":1589191387
    }
    ]
    Creating the Doris data table to be imported:
    CREATE TABLE `example_tbl` (
    `category` varchar(24) NULL COMMENT "",
    `author` varchar(24) NULL COMMENT "",
    `timestamp` bigint(20) NULL COMMENT "",
    `dt` int(11) NULL COMMENT "",
    `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
    PARTITION p0 VALUES [("-2147483648"), ("20200509")),
    PARTITION p20200509 VALUES [("20200509"), ("20200510")),
    PARTITION p20200510 VALUES [("20200510"), ("20200511")),
    PARTITION p20200511 VALUES [("20200511"), ("20200512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4
    PROPERTIES (
    "replication_num" = "1"
    );
    Importing Json data with a simple mode:
    CREATE ROUTINE LOAD example_db.test_json_label_1 ON table1
    COLUMNS(category,price,author)
    PROPERTIES
    (
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "kafka_partitions" = "0,1,2",
    "kafka_offsets" = "0,0,0"
    );
    Accurate import of data in Json format:
    CREATE ROUTINE LOAD example_db.test1 ON example_tbl
    COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    PROPERTIES
    (
    "desired_concurrent_number"="3",
    "max_batch_interval" = "20",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "strict_mode" = "false",
    "format" = "json",
    "jsonpaths" = "[\\"$.category\\",\\"$.author\\",\\"$.price\\",\\"$.timestamp\\"]",
    "strip_outer_array" = "true"
    )
    FROM KAFKA
    (
    "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
    "kafka_topic" = "my_topic",
    "kafka_partitions" = "0,1,2",
    "kafka_offsets" = "0,0,0"
    );
    The import relationship between strict mode and the source data Take the column type TinyInt as an example here. When the column in the table allows to import null values:
    source data
    source data example
    string to int
    strict_mode
    result
    Null value
    \\N
    N/A
    true or false
    NULL
    not null
    aaa or 2000
    NULL
    true
    invalid data(filtered)
    not null
    aaa
    NULL
    false
    NULL
    not null
    1
    1
    true or false
    correct data
    Here we take Decimal (1,0) as an example. When the column in the table allows to import null values:
    source data
    source data example
    string to int
    strict_mode
    result
    Null value
    \\N
    N/A
    true or false
    NULL
    not null
    aaa
    NULL
    true
    invalid data(filtered)
    not null
    aaa
    NULL
    false
    NULL
    not null
    1 or 10
    1
    true or false
    correct data
    Note
    Although 10 is an out-of-range value, it is not affected by the strict mode as its type meets decimal needs. 10 will be filtered in other ETL processing processes. But it will not be filtered by strict mode.
    Accessing an SSL-authenticated Kafka Cluster Accessing an SSL-authenticated Kafka cluster requires the user to provide a certificate file(ca.pem) to authenticate the Kafka Broker public key. If the Kafka cluster also has client authentication enabled, client public key (client.pem), Key file (client.key), and Key password are also required. The required files need to be uploaded to Doris **with catalog name kafka** in advance via CREATE FILE command. You can see HELP CREATE FILE; for specific help by enteringCREATE FILE command. Here is an example:
    1. Uploading Files
    CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
    CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
    CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
    2. Create Routine Import Job
    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES
    (
    "desired_concurrent_number"="1"
    )
    FROM KAFKA
    (
    "kafka_broker_list"= "broker1:9091,broker2:9091",
    "kafka_topic" = "my_topic",
    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"
    );
    Note
    Doris accesses Kafka cluster through Kafka's C++ API librdkafka. For more information on parameters supported by librdkafka, see Configuration properties.

    peek Job status

    The specific command and example of viewing Job status can be viewed through the HELP SHOW ROUTINE LOAD; command.
    The specific command and example of viewing task running status can be viewed through the HELP SHOW ROUTINE LOAD TASK; command.
    You can only peek running tasks. You cannot peek tasks that have ended or have not started.

    Modify Job properties

    Users can modify the already created Job. The specific description can be viewed by entering the HELP ALTER ROUTINE LOAD; command or see ALTER ROUTINE LOAD.

    Job Control

    Users can control the stop, pause and restart of the job through the STOP/PAUSE/RESUME three commands. Help and examples can be viewed through the HELP STOP ROUTINE LOAD;``HELP PAUSE ROUTINE LOAD; and HELP RESUME ROUTINE LOAD; three commands.

    Other Descriptions

    1. The relationship between routine import Job and ALTER TABLE operation.
    Example import does not block SCHEMA CHANGE and ROLLUP operations. However, note that if the column mapping relationships cannot match after SCHEMA CHANGE is completed, it will lead to a surge of erroneous data for the job and eventually cause the job to pause. It is recommended that you reduce this problem by explicitly specifying the column mapping relationships in the routine import job and by adding nullable columns or columns with default values.
    Deleting a partition of a table may cause the imported data to fail to find the corresponding Partition and the job to enter a pause.
    2. The relationship between routine import job and other import jobs (LOAD, DELETE, INSERT).
    There is no conflict between routine importing and other LOAD jobs as well as INSERT operations.
    When the DELETE operation is executed, the corresponding table partition cannot have any ongoing import jobs. So before executing the DELETE operation, you may need to suspend the routine import job and wait until all the issued tasks are completed before executing DELETE.
    3. The relationship between routine import job and DROP DATABASE/TABLE operation. Once the database or table corresponding to routine import is deleted, job will automatically CANCEL.
    4. The relationship between Kafka-type routine import job and Kafka topic.
    When kafka_topic declared by the user in the create routine import statement does not exist in the Kafka cluster.
    If the broker of the user's Kafka cluster has auto.create.topics.enable = true set, then kafka_topic will be automatically created first, and the number of partitions created automatically is determined by the configuration of the broker num.partitions in the user's Kafka cluster. The routine Job will keep reading data from this topic as normal.
    If the broker of the user's Kafka cluster has auto.create.topics.enable = false set, then the topic will not be automatically created, and the routine Job will be paused before any data is read, with a status of PAUSED. Therefore, if you want the kafka topic to be automatically created by the routine when it doesn't exist, just set auto.create.topics.enable = true for the broker in the user's Kafka cluster.
    5. Potential issues may occur in network-isolated environments. In some environments, there are isolation measures for network segments and domain name resolution, so it is important to note:
    1. The Broker list specified when creating a Routine load task must be accessible by the Doris service.
    2. In Kafka, if advertised.listeners is configured, the address in advertised.listeners must be accessible to the Doris service.
    3. Specify the Partition and Offset for consumption. Doris supports specifying the Partition and Offset to start consumption. The new version also supports the ability to specify time points for consumption. The configuration of the corresponding parameters is explained here. There are three related parameters:
    kafka_partitions: Specify the partition list to be consumed, such as: "0, 1, 2, 3".
    kafka_offsets: Specify the starting offset for each partition, which must correspond to the number of kafka_partitions. E.g. "1000, 1000, 2000, 2000"
    property.kafka_default_offset: Specify the default starting offset of the partition.
    When creating the import Job, these three parameters can be combined as follows:
    Combination
    kafka_partitions
    kafka_offsets
    property.kafka_default_offset
    Behavior
    1
    No
    No
    No
    The system will automatically find all partitions corresponding to the topic and start consuming them from OFFSET_END
    2
    No
    No
    Yes
    The system will automatically find all partitions of the topic and start consuming from the position specified by default offset
    3
    Yes
    No
    No
    The system will start consuming from OFFSET_END of the specified partition
    4
    Yes
    Yes
    No
    The system will start consuming from a specified offset of the specified partition
    5
    Yes
    No
    Yes
    The system will start consuming from the position specified by default offset of the specified partition
    4. The difference between STOP and PAUSE. FE will automatically periodically clean up STOP state ROUTINE LOAD, while those in the PAUSE state can be re-enabled.

    Parameter Description

    Some system configuration parameters will affect the use of routine import.
    1. max_routine_load_task_concurrent_num FE configuration item, defaults to 5, can be modified at runtime. This parameter limits the maximum number of concurrent subtasks for a routine import job. It's recommended to keep the default value. Setting it too large may lead to too many concurrent tasks and consume cluster resources.
    2. max_routine_load_task_num_per_be FE configuration item, defaults to 5, can be modified at runtime. This parameter limits the maximum number of concurrently executed on each BE node. It's recommended to keep the default value. If the setting is too large, it can lead to too many concurrent tasks and consume cluster resources.
    3. max_routine_load_job_num FE configuration item, defaults to 100, can be modified at runtime. This parameter limits the total number of routine import jobs, including those in NEED_SCHEDULED, RUNNING, PAUSE states. After this, no new Jobs can be submitted.
    4. max_consumer_num_per_group BE configuration item, defaults to 3. This parameter indicates the maximum number of consumers that can be generated in a sub-task for data consumption. For Kafka data source, a consumer may consume one or more kafka partitions. Suppose a task needs to consume 6 kafka partitions, then it will generate 3 consumers, each consumer consumes 2 partitions. If there are only 2 partitions, then only 2 consumers will be generated, each consumer consumes 1 partition.
    5. push_write_mbytes_per_sec BE configuration item. Default is 10, i.e., 10MB/s. This is a common import parameter, not limited to routine import job. This parameter limits the speed of importing data to disk. For high-performance storage devices such as SSD, this limit can be lifted moderately.
    6. max_tolerable_backend_down_num FE configuration item, default value is 0. Given certain conditions, Doris can reschedule the paused tasks, i.e., turn into RUNNING. 0 means rescheduling is only allowed if all BE nodes are alive.
    7. period_of_auto_resume_min FE configuration item, default is 5 minutes. Doris rescheduling will only be attempted up to 3 times within the 5-minute cycle. If all 3 attempts fail, the current task is locked, and no more scheduling is performed. But it can be manually recovered with human intervention.

    Best Practice

    This document is the best practice of using Routine Load to import lineitem table with TPC-H set data from Kafka.

    Determine kafka msg format and target table structure

    The format written to kafka msg is:
    1992-01-02 3273383 3 2508983 508984 42 83658.12 0.06 0.07 R F 1992-03-19 1992-01-18 COLLECT COD TRUCK the carefully ironic accounts hin 1992-01-02 26696039 2 8782384 532409 50 73297.5 0.02 0.07 R F 1992-01-31 1992-01-13 DELIVER IN PERSON REG AIR . furiously regul 1992-01-02 47726080 1 5950048 950049 24 26346 0.02 0.02 A F 1992-02-23 1992-01-16 DELIVER IN PERSON RAIL l pearls. spec 1992-01-02 77918949 2 7276493 526501 23 33789.99 0.06 0 R F 1992-03-28 1992-01-26 NONE TRUCK y express requests. fin 1992-01-02 87026306 4 9061545 811573 11 16566.99 0.08 0.06 R F 1992-02-09 1992-01-21 TAKE BACK RETURN AIR ly regular instructions s 1992-01-02 135925030 4 14097984 598013 30 59438.4 0.02 0.08 R F 1992-02-24 1992-01-06 COLLECT COD REG AIR es boost regular platelets. reg 1992-01-02 189122402 2 13648194 398234 5 5707.55 0.1 0.08 R F 1992-03-24 1992-01-06 COLLECT COD FOB aggle caref 1992-01-02 235359552 4 8148971 898996 29 58567.53 0.1 0.07 A F 1992-02-10 1992-02-01 NONE AIR furiously ironic p 1992-01-02 298717351 3 4078182 78183 42 48719.16 0.09 0.07 R F 1992-03-08 1992-01-26 DELIVER IN PERSON MAIL hely regular accounts. blithely i 1992-01-02 305288709 3 14997743 747786 33 60720 0.04 0.06 A F 1992-03-25 1992-01-04 TAKE BACK RETURN TRUCK counts engage across the
    The table structure of routine target table lineitem in Doris. The actual table name in Doris is: lineitem_rtload.
    CREATE TABLE lineitem_rtload ( l_shipdate date NOT NULL, l_orderkey bigint(20) NOT NULL, l_linenumber int(11) NOT NULL, l_partkey int(11) NOT NULL, l_suppkey int(11) NOT NULL, l_quantity decimalv3(15, 2) NOT NULL, l_extendedprice decimal(15, 2) NOT NULL, l_discount decimalv3(15, 2) NOT NULL, l_tax decimalv3(15, 2) NOT NULL, l_returnflag varchar(1) NOT NULL, l_linestatus varchar(1) NOT NULL, l_commitdate date NOT NULL, l_receiptdate date NOT NULL, l_shipinstruct varchar(25) NOT NULL, l_shipmode varchar(10) NOT NULL, l_comment varchar(44) NOT NULL ) ENGINE=OLAP DUPLICATE KEY(l_shipdate, l_orderkey) COMMENT 'OLAP' DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" );
    Based on the write format and target table information, create a routine load task.
    CREATE ROUTINE LOAD tpch_100_d.rtl_20230809 ON lineitem_rtload COLUMNS TERMINATED BY "\\t" PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "max_error_number" = "100", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "10.0.1.138:9092", "kafka_topic" = "doris_routine_load_test", "property.group.id" = "routine_test", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
    COLUMNS TERMINATED BY "\\t" Specifies the field separator in kafka msg, default is "\\t"
    max_batch_interval/max_batch_rows/max_batch_size
    These three parameters respectively represent:
    Each sub-task's maximum execution time. Unit: seconds. Value range: 5-60. Default value: 10. Maximum number of rows read by each sub-task. It must be greater than or equal to 200000. Default calue: 200000. Maximum number of bytes each sub-task reads. Unit: bytes, Value range: 100MB-1GB. Default value: 100MB. These three parameters are used to control the execution time and processing volume of a sub-task. When any one of them reaches the threshold, the task ends.
    max_error_number
    Maximum number of error lines allowed within the sampling window. Must be greater than or equal to 0. Default value: 0. No error lines are permitted.
    The sampling window is max_batch_rows * 10. That is, if the number of error lines within the sampling window exceeds max_error_number, the routine job will be paused and manual intervention is required to check data quality.
    Rows filtered out by the where condition are not counted as error lines.
    strict_mode
    Whether to enable strict mode, it is disabled by default. When the strict mode is enabled, if the type conversion of columns of non-empty original data is NULL, the data will be filtered. Specified as:
    "strict_mode" = "true"
    The meaning of the strict mode is: strict filtering is performed for the column type conversion during the import process. The strategy of strict filtering is as follows:
    For column type conversion, if strict mode is true, then the erroneous data will be filtered. Here the erroneous data refers to: the original data is not empty, and after participating in the column type conversion, the result is a null value. When a column in the import is generated by function transformation, strict mode does not affect it. For the imported column type with range restrictions, if the original data can pass the type conversion normally, but fails to pass the range restriction, strict mode does not affect it. For example, if the type is decimal (1,0), and the original data is 10, it belongs to data that can pass the type conversion but not within the range of the column claim. Strict mode does not affect such data.

    Viewing Task Operation

    After creating a routine load task, you can view the running routine tasks through the show routine load command. If you can't find the corresponding routine task in the show routine load, it might be stopped or paused because of the failed routine task or too many errors. Use show all routine load to view all routine tasks of all status.
    MySQL [tpch_100_d]> show routine load\\G; *************************** 1. row *************************** Id: 21619 Name: rtl_20230809 CreateTime: 2023-08-09 19:17:16 PauseTime: NULL EndTime: NULL DbName: default_cluster:tpch_100_d TableName: lineitem_rtload State: RUNNING DataSourceType: KAFKA CurrentTaskNum: 3 JobProperties: {"timezone":"Asia/Shanghai","send_batch_parallelism":"1","columnSeparator":"'\\t'","load_to_single_tablet":"false","lineDelimiter":"\\n","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"true","jsonpaths":"","currentTaskConcurrentNum":"3","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"","maxBatchIntervalS":"20","whereExpr":"","precedingFilter":"","mergeType":"APPEND","format":"csv","json_root":"","deleteCondition":"","desireTaskConcurrentNum":"3","maxErrorNum":"100","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"} DataSourceProperties: {"topic":"doris_routine_load_test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.1.138:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ryanzryu_routine_test"} Statistic: {"receivedBytes":568128,"runningTxns":[],"errorRows":0,"committedTaskNum":31,"loadedRows":4400,"loadRowsRate":7,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":4400,"unselectedRows":0,"receivedBytesRate":905,"taskExecuteTimeMs":627757} Progress: {"0":"1599","1":"1316","2":"1482"} Lag: {"0":0,"1":0,"2":0} ReasonOfStateChanged: ErrorLogUrls: OtherMsg:
    The descriptions of each field are as follows:
    Id: Job ID
    Name: Job name
    CreateTime: Job creation time
    PauseTime: Recent pause time of job
    EndTime: Job end time
    DbName: Corresponding database name
    TableName: Corresponding table name
    State: Job running status
    DataSourceType: Data source type: KAFKA
    CurrentTaskNum: Current number of subtasks
    JobProperties: Job configuration details
    DataSourceProperties: Data source configuration details CustomProperties: Custom configuration Statistic: Job execution status statistics Progress: Job progress Lag: Job delay status ReasonOfStateChanged: Reasons for job status changes ErrorLogUrls: View address for filtered, and substandard data OtherMsg: Other error information
    State There are 4 states: NEED_SCHEDULE: Job waiting to be scheduled RUNNING: Job running PAUSED: Job suspending STOPPED: Job ended CANCELLED: Job canceled

    Kafka to start writing task

    Script pattern is used here for Kafka writing, 200 messages are written every 5 seconds. Later, the progress of Kafka's three offsets can be seen in the process field through the show routine load command.
    Progress: {"0":"2061","1":"2135","2":"2254"} Progress: {"0":"2279","1":"2293","2":"2321"}
    To modify routine load content The following SQL can be used to modify the routine load
    ALTER ROUTINE LOAD FOR [db.]job_name [job_properties] FROM data_source [data_source_properties]
    Note:
    Only a job in the PAUSED state can be modified.
    For example, you need to modify max_batch_interval to 10s.
    First, stop the corresponding task Use the following command to pause the task:
    PAUSE [ALL] ROUTINE LOAD FOR job_name
    MySQL [tpch_100_d]> PAUSE ROUTINE LOAD FOR rtl_20230809; Query OK, 0 rows affected (0.00 sec) MySQL [tpch_100_d]> show ROUTINE LOAD FOR rtl_20230809\\G; *************************** 1. row *************************** Id: 21619 Name: rtl_20230809 CreateTime: 2023-08-09 19:17:16 PauseTime: 2023-08-09 21:03:21 EndTime: NULL DbName: default_cluster:tpch_100_d TableName: lineitem_rtload State: PAUSED DataSourceType: KAFKA CurrentTaskNum: 0 JobProperties: {"timezone":"Asia/Shanghai","send_batch_parallelism":"1","columnSeparator":"'\\t'","load_to_single_tablet":"false","lineDelimiter":"\\n","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"true","jsonpaths":"","currentTaskConcurrentNum":"3","fuzzy_parse":"false","partitions":"","columnToColumnExpr":"","maxBatchIntervalS":"20","whereExpr":"","precedingFilter":"","mergeType":"APPEND","format":"csv","json_root":"","deleteCondition":"","desireTaskConcurrentNum":"3","maxErrorNum":"100","strip_outer_array":"false","execMemLimit":"2147483648","num_as_string":"false","maxBatchRows":"300000"} DataSourceProperties: {"topic":"doris_routine_load_test","currentKafkaPartitions":"0,1,2","brokerList":"10.0.1.138:9092"} CustomProperties: {"kafka_default_offsets":"OFFSET_BEGINNING","group.id":"ryanzryu_routine_test"} Statistic: {"receivedBytes":1123678,"runningTxns":[],"errorRows":0,"committedTaskNum":114,"loadedRows":8703,"loadRowsRate":3,"abortedTaskNum":0,"errorRowsAfterResumed":0,"totalRows":8703,"unselectedRows":0,"receivedBytesRate":486,"taskExecuteTimeMs":2310792} Progress: {"0":"2917","1":"2754","2":"3029"} Lag: {"0":0,"1":0,"2":0} ReasonOfStateChanged: ErrorReason{code=errCode = 100, msg='User root pauses routine load job'} ErrorLogUrls: OtherMsg: 1 row in set (0.00 sec)
    You can see that the status is paused: State: PAUSED

    Modify task parameters

    ALTER ROUTINE LOAD FOR tpch_100_d.rtl_20230809 PROPERTIES ( "max_batch_interval" = "10" );

    Restarting Task

    RESUME ROUTINE LOAD FOR rtl_20230809;
    By using the show ROUTINE LOAD FOR rtl_20230809\\G; command, you can see that the task status changes to running. And the refresh rate of the Progress field is about 5s, which proves that the modification takes effect.

    More help

    For more detailed syntax related to the use of routine load, you can enter HELP ROUTINE LOAD command in the Mysql client.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support