tencent cloud

Feedback

Broker Load (HDFS Data)

Last updated: 2024-06-27 10:55:53
    Broker load is an asynchronous import method. The supported data sources depend on the data sources supported by the Broker process. There are generally Brokers that support the community version of HDFS and Brokers that support S3 protocol Cloud Object Storage. This document explains how to use Broker load to import HDFS data.
    Since the data in the Doris table is ordered, broker load needs to use the resources of the Doris cluster to sort the data when importing it. Compared with Spark load to complete the migration of massive historical data, it occupies more resources of Doris cluster. Therefore, this method is mostly used when users do not have computing resources like Spark. If there are Spark computing resources, it is recommended to use Spark load.
    Users need to create Broker load import through MySQL protocol, and check the import results through the import command.

    Applicable Scenario

    The source data is in the storage system that Broker can access, such as HDFS.
    The data volume is at the level of tens to hundreds of GB.

    Basic Principles

    After the user submits the import task, the FE will generate the corresponding plan and distribute the plan to multiple BEs for execution, according to the current number of BEs and the size of the file. Each BE executes a part of the import data. During the execution, the BE will pull data from Broker, transform the data, and import the data into the system. After all BEs have completed the import, the FE will finally decide whether the import is successful.
    +
    | 1. user create broker load
    v
    +----+----+
    | |
    | FE |
    | |
    +----+----+
    |
    | 2. BE etl and load the data
    +--------------------------+
    | | |
    +---v---+ +--v----+ +---v---+
    | | | | | |
    | BE | | BE | | BE |
    | | | | | |
    +---+-^-+ +---+-^-+ +--+-^--+
    | | | | | |
    | | | | | | 3. pull data from broker
    +---v-+-+ +---v-+-+ +--v-+--+
    | | | | | |
    |Broker | |Broker | |Broker |
    | | | | | |
    +---+-^-+ +---+-^-+ +---+-^-+
    | | | | | |
    +---v-+-----------v-+----------v-+-+
    | HDFS/BOS/AFS cluster |
    | |
    +----------------------------------+
    

    Start Import

    Let's look at the use of Broker Load through a few practical scenario examples.

    Importing Data from Hive Partition Table

    1. Create a Hive table.
    ##The data format is: default, partition field is: day
    CREATE TABLE `ods_demo_detail`(
    `id` string,
    `store_id` string,
    `company_id` string,
    `tower_id` string,
    `commodity_id` string,
    `commodity_name` string,
    `commodity_price` double,
    `member_price` double,
    `cost_price` double,
    `unit` string,
    `quantity` double,
    `actual_price` double
    )
    PARTITIONED BY (day string)
    row format delimited fields terminated by ','
    lines terminated by '\\n'
    Use Hive's load command to import your data into the Hive table:
    load data local inpath '/opt/custorm' into table ods_demo_detail;
    2. Create a Doris table. For the specific table creation syntax, see CREATE TABLE.
    CREATE TABLE `doris_ods_test_detail` (
    `rq` date NULL,
    `id` varchar(32) NOT NULL,
    `store_id` varchar(32) NULL,
    `company_id` varchar(32) NULL,
    `tower_id` varchar(32) NULL,
    `commodity_id` varchar(32) NULL,
    `commodity_name` varchar(500) NULL,
    `commodity_price` decimal(10, 2) NULL,
    `member_price` decimal(10, 2) NULL,
    `cost_price` decimal(10, 2) NULL,
    `unit` varchar(50) NULL,
    `quantity` int(11) NULL,
    `actual_price` decimal(10, 2) NULL
    ) ENGINE=OLAP
    UNIQUE KEY(`rq`, `id`, `store_id`)
    PARTITION BY RANGE(`rq`)
    (
    PARTITION P_202204 VALUES [('2022-04-01'), ('2022-05-01')))
    DISTRIBUTED BY HASH(`store_id`) BUCKETS 1
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 3",
    "dynamic_partition.enable" = "true",
    "dynamic_partition.time_unit" = "MONTH",
    "dynamic_partition.start" = "-2147483648",
    "dynamic_partition.end" = "2",
    "dynamic_partition.prefix" = "P_",
    "dynamic_partition.buckets" = "1",
    "in_memory" = "false",
    "storage_format" = "V2"
    );
    3. Start importing data, for specific syntax, see Broker Load .
    LOAD LABEL broker_load_2022_03_23
    (
    DATA INFILE("hdfs://192.168.20.123:8020/user/hive/warehouse/ods.db/ods_demo_detail/*/*")
    INTO TABLE doris_ods_test_detail
    COLUMNS TERMINATED BY ","
    (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
    COLUMNS FROM PATH AS (`day`)
    SET
    (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
    )
    WITH BROKER "broker_name_1"
    (
    "username" = "hdfs",
    "password" = ""
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );
    Note
    192.168.20.123:8020 is the IP and port of the active namenode for the HDFS cluster used by the Hive table.

    Hive Partition Table Import (ORC Format)

    1. Creating a Hive Partition Table in ORC format.
    #Data format: ORC partition: day
    CREATE TABLE `ods_demo_orc_detail`(
    `id` string,
    `store_id` string,
    `company_id` string,
    `tower_id` string,
    `commodity_id` string,
    `commodity_name` string,
    `commodity_price` double,
    `member_price` double,
    `cost_price` double,
    `unit` string,
    `quantity` double,
    `actual_price` double
    )
    PARTITIONED BY (day string)
    row format delimited fields terminated by ','
    lines terminated by '\\n'
    STORED AS ORC
    2. Create a Doris table, the create table statement is the same as the above Doris create table statement.
    3. Using Broker Load to import data.
    LOAD LABEL dish_2022_03_23
    (
    DATA INFILE("hdfs://10.220.147.151:8020/user/hive/warehouse/ods.db/ods_demo_orc_detail/*/*")
    INTO TABLE doris_ods_test_detail
    COLUMNS TERMINATED BY ","
    FORMAT AS "orc"
    (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
    COLUMNS FROM PATH AS (`day`)
    SET
    (rq = str_to_date(`day`,'%Y-%m-%d'),id=id,store_id=store_id,company_id=company_id,tower_id=tower_id,commodity_id=commodity_id,commodity_name=commodity_name,commodity_price=commodity_price,member_price=member_price,cost_price=cost_price,unit=unit,quantity=quantity,actual_price=actual_price)
    )
    WITH BROKER "broker_name_1"
    (
    "username" = "hdfs",
    "password" = ""
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );
    Note
    FORMAT AS "orc": Specifies the format of the data to be imported.
    SET: Defines the field mapping relationship between the Hive table and the Doris table and some operations to convert the fields.

    HDFS File System Data Import

    Let's continue to take the Doris table created above as an example to demonstrate importing data from HDFS through Broker Load. The statement to import job is as follows:
    LOAD LABEL demo.label_20220402
    (
    DATA INFILE("hdfs://10.220.147.151:8020/tmp/test_hdfs.txt")
    INTO TABLE `ods_dish_detail_test`
    COLUMNS TERMINATED BY "\\t" (id,store_id,company_id,tower_id,commodity_id,commodity_name,commodity_price,member_price,cost_price,unit,quantity,actual_price)
    )
    with HDFS (
    "fs.defaultFS"="hdfs://10.220.147.151:8020",
    "hdfs_user"="root"
    )
    PROPERTIES
    (
    "timeout"="1200",
    "max_filter_ratio"="0.1"
    );
    For the specific parameters, see Broker and Broker Load documents.

    Viewing the Import Status

    We can use the following command to view the status information of the above import task. For the specific syntax for viewing the import status, see SHOW LOAD.
    mysql> show load order by createtime desc limit 1\\G;
    *************************** 1. row ***************************
    JobId: 41326624
    Label: broker_load_2022_03_23
    State: FINISHED
    Progress: ETL:100%; LOAD:100%
    Type: BROKER
    EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=27
    TaskInfo: cluster:N/A; timeout(s):1200; max_filter_ratio:0.1
    ErrorMsg: NULL
    CreateTime: 2022-04-01 18:59:06
    EtlStartTime: 2022-04-01 18:59:11
    EtlFinishTime: 2022-04-01 18:59:11
    LoadStartTime: 2022-04-01 18:59:11
    LoadFinishTime: 2022-04-01 18:59:11
    URL: NULL
    JobDetails: {"Unfinished backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[]},"ScannedRows":27,"TaskNumber":1,"All backends":{"5072bde59b74b65-8d2c0ee5b029adc0":[36728051]},"FileNumber":1,"FileSize":5540}
    1 row in set (0.01 sec)

    Canceling Import

    When the Broker load job status is neither CANCELLED nor FINISHED, it can be manually canceled by the user. When canceled, the label of the import task to be canceled needs to be specified. The syntax for the cancel import command can be executed with CANCEL LOAD. For example: To cancel the import Job on the 'demo' database, labeled 'broker_load_2022_03_23':
    CANCEL LOAD FROM demo WHERE LABEL = "broker_load_2022_03_23";

    Related System Configuration

    Broker parameters

    The Broker load needs to access remote storage with the Broker process. Different Brokers require different parameters. For details, see Broker.

    FE configuration

    The following configurations are system-level configurations for Broker load, i.e., those that apply to all Broker load import tasks. They can be adjusted by changing the fe.conf file.
    min_bytes_per_broker_scanner/max_bytes_per_broker_scanner/max_broker_concurrency
    The first two configurations limit the smallest and largest amount of data processed by a single BE node. The third configuration limits the maximum concurrency count for an import job. The minimum processed data volume, the maximum concurrency count, the size of the source file, and the number of BE nodes in the current cluster collectively determine the concurrency count for this import.
    The concurrency count for this import = Math.min(Source file size / Minimum processing volume, Maximum concurrency, number of Current BE node)
    The amount of data processed by a single BE for this import = Source file size / The concurrency count of this import
    Typically, the maximum amount of data supported by a single import job is max_bytes_per_broker_scanner * BE node count. If you need to import larger amounts of data, you need to adjust the size of the max_bytes_per_broker_scanner parameter appropriately. Default configuration:
    Parameter: min_bytes_per_broker_scanner. Default value: 64MB. Unit: bytes.
    Parameter: max_broker_concurrency. Default value: 10.
    Parameter: max_bytes_per_broker_scanner. Default value: 3GB. Unit: bytes.

    Best Practice

    Application Scenario

    The most suitable scenario for using Broker load is when the original data is in the file system (HDFS, BOS, AFS). Moreover, as Broker load is the only asynchronous import method in a single import, it may be considered for use with large file imports where asynchronous access is required.

    Data Volume

    This discussion focuses on the scenario of a single BE. If there are multiple BEs in the user's cluster, the volume in the following title should be multiplied by the number of BEs. For instance, if there are 3 BE nodes, the volume under (including) 3G should be multiplied by 3, i.e., under (including) 9G.
    Under (including) 3 GB: The user can directly submit a Broker load import request.
    Over 3 GB: Since the maximum processing volume of a single import BE is 3 GB, files to be imported larger than 3 GB would need to adjust the import parameters of the Broker load to realize importation of larger files.
    1.1 Modify the maximum scan volume of a single BE and the maximum concurrency, based on the number of current BE nodes and the size of the original file.
    Edit the configuration in fe.conf
    max_broker_concurrency = Number of BEs
    Data volume processed by a single BE for the current import task = Original file size / max_broker_concurrency
    max_bytes_per_broker_scanner >= Data volume processed by a single BE for the current import task
    
    For example, a 100G file, the number of BEs in the cluster is 10
    max_broker_concurrency = 10
    max_bytes_per_broker_scanner >= 10G = 100G / 10
    After modification, all of the BEs will handle the import task concurrently, and each BE will handle a portion of the original file.
    Note
    The above two configurations in FE are system configurations, which means that their modifications apply to all Broker load tasks.
    1.2 Set the timeout time for the current import task upon creation.
    Data volume processed by a single BE for the current import task / User's slowest import speed for Doris cluster (MB/s) >= Timeout for current import task >= Data volume processed by a single BE for current import task / 10M/s
    
    For example, a 100G file, the number of BEs in the cluster is 10
    timeout >= 1000s = 10G / 10M/s
    1.3 When the user finds the timeout calculated in the second step exceeds the system default maximum import timeout of 4 hours. At this time, it is not recommended for the user to directly adjust the maximum import timeout to solve the problem. If a single import exceeds the default maximum import timeout of 4 hours, it is best to divide the file to be imported and import it multiple times. The main reason is: if a single import exceeds 4 hours, the time cost of retrying after failure is very high. You can use the following formula to calculate the expected maximum file data volume for Doris cluster:
    Expected maximum import file volume = 14400s * 10M/s * Number of BEs
    For example: The number of BEs in the cluster is 10
    Expected maximum import file volume = 14400s * 10M/s * 10 = 1440000M ≈ 1440G
    
    Note: Generally, the user's environment may not reach a speed of 10M/s, so it's recommended to split files larger than 500G and then import them.

    Job Scheduling

    The system restricts the number of Broker load jobs running in a cluster to prevent running too many load jobs simultaneously.
    Firstly, the configuration parameter of FE: desired_max_waiting_jobs restricts the number of Broker load jobs that do not start or are running (Job status is PENDING or LOADING) within a cluster. Default value: 100. If this threshold is exceeded, the newly submitted job will be directly rejected.
    A Broker load job will be divided into pending task and loading task stages where the pending task is responsible for obtaining the import file information, and the loading task is sent to BE to perform the specific import task.
    The configuration parameter of FE async_pending_load_task_pool_size is used to limit the number of pending tasks running at the same time. It also controls the actual number of running import tasks. The default parameter is 10. That is to say, assuming that the user has submitted 100 load jobs, only 10 jobs will enter LOADING status to start execution at the same time and the other Jobs are in the PENDING waiting state.
    FE's configuration parameter async_loading_load_task_pool_size is used to limit the number of loading tasks running simultaneously. A Broker load job will have one pending task and multiple loading tasks (equal to the number of DATA INFILE clauses in the LOAD statement). Therefore, async_loading_load_task_pool_size should be greater than or equal to the async_pending_load_task_pool_size.

    Performance Analysis

    Before submitting a LOAD job, you can execute command set enable_profile=true to enable the session variable. Then you can submit the task. Once the job is completed, you can view the Job's profile in the Queris Tab on the FE's web page. You can view the SHOW LOAD PROFILE help document to get more help information. This profile can help analyze the running status of import jobs. Currently, the profile can be viewed only when jobs are completed.

    FAQs

    Import error: Scan bytes per broker scanner exceed limit: xxx. Please see the Best Practices section of the document and modify the FE configuration items max_bytes_per_broker_scanner and max_broker_concurrency.
    Import error: failed to send batch or TabletWriter add batch with unknown id. Appropriately modify query_timeout and streaming_load_rpc_max_alive_time_sec. streaming_load_rpc_max_alive_time_sec: During the import process, Doris enables a Writer for each Tablet to receive and write data. This parameter specifies the wait timeout for the Writer. If the Writer hasn't received any data within this time, it will be automatically destroyed. If the system's processing speed is slow, the Writer might not receive the next batch of data for a long time, causing import error: TabletWriter add batch with unknown id. In this case, it's appropriate to increase this configuration. The default is 600 seconds.
    Import error: LOAD_RUN_FAIL; msg: Invalid Column Name: xxx. If the data is in PARQUET or ORC format, the column names in the file header needs to be consisttent with the column names in the Doris table, such as:
    (tmp_c1,tmp_c2)
    SET
    (
    id=tmp_c2,
    name=tmp_c1
    )
    It represents obtaining the column in parquet or orc with (tmp_c1, tmp_c2) as the column name, and mapping to the (id, name) column in the Doris table. If not set, use the column in the column as the mapping.
    Note
    If you use some versions of Hive to directly generate orc files, the table header in the orc file is not hive metadata, but (_col0, _col1, _col2, ...). This may cause the Invalid Column Name error, so you need to use a set for mapping.
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support