tencent cloud

Feedback

Spark Load

Last updated: 2024-06-27 10:56:24
    Spark load realizes the preprocessing of load data by Spark, improves the performance of importing large amount of Doris data and saves computing resources of Doris cluster. It's mainly used for initial migration scenarios and large amount of data imported into Doris.
    Spark load uses the resources of the Spark cluster to sort the data to be imported, and Doris BE writes files directly, which can greatly reduce the resource usage of the Doris cluster, and is very good for historical mass data migration to reduce the resource usage and load of the Doris cluster.
    If users do not have the resources of Spark cluster and want to conveniently and quickly complete the migration of historical data from external storage, they can use Broker Load (HDFS Data). Compared with Spark load, importing Broker load will consume more resources on the Doris cluster.
    Spark load is an asynchronous import method. Users need to create a Spark import task through the MySQL protocol and view the import result with the SHOW LOAD command.

    Applicable Scenario

    The source data is located in a storage system that Spark has access to, such as HDFS.
    Data size ranges from tens of GB to TB.

    Basic Principles

    Users submit Spark type import tasks through the MySQL client, the FE records metadata and returns a result that the user submitted successfully.
    The execution of a Spark load task mainly involves the following five stages.
    1. E schedules and submits ETL tasks to spark cluster for execution.
    2. The Spark cluster completes ETL to complete the preprocessing of load data. This includes global dictionaries building (Bitmap type), partitioning, sorting, and aggregation.
    3. After the ETL task is completed, FE accesses the path of each tablet that has been preprocessed, and schedules the related be to execute the push tasks.
    4. BE reads the data through Broker and transforms it into Doris's underlying storage format.
    5. FE schedules the activation of versions to complete the import task.
    +
    | 0. User create spark load job
    +----v----+
    | FE |---------------------------------+
    +----+----+ |
    | 3. FE send push tasks |
    | 5. FE publish version |
    +------------+------------+ |
    | | | |
    +---v---+ +---v---+ +---v---+ |
    | BE | | BE | | BE | |1. FE submit Spark ETL job
    +---^---+ +---^---+ +---^---+ |
    |4. BE push with broker | |
    +---+---+ +---+---+ +---+---+ |
    |Broker | |Broker | |Broker | |
    +---^---+ +---^---+ +---^---+ |
    | | | |
    +---+------------+------------+---+ 2.ETL +-------------v---------------+
    | HDFS +-------> Spark cluster |
    | <-------+ |
    +---------------------------------+ +-----------------------------+

    Configuring Environment

    Before using, ensure to configure the environment of the FE Follower node. Only the Follower node needs to be configured, and the Observer node does not.
    Note:
    Some operations in the configuration environment section require support from Tencent Cloud technical personnel. Please submit a ticket and contact us as required.

    Connect the network from Doris to the EMR cluster

    It is best to ensure that they are in the same VPC, subnet, and security group to minimize network communication issues. If this is not satisfied, you need to contact Tencent Cloud Network Operations for a solution, and ultimately, use ping/telnet to confirm communication.

    Configuring Hadoop Client

    FE uses the Yarn command to obtain the status of running applications and kill applications, so it's necessary to configure the Yarn client for FE. It;s recommended to use version 2.5.2 or above.
    Customers need to provide the Hadoop client of the Yarn cluster they want to connect to. This ensures that it's consistent with the EMR cluster version, avoids compatibility problems, and includes the necessary configuration files such as hdfs-site.xml, yarn-site.xml, and core-site.xml.
    After obtaining the Hadoop client, place it in /usr/local/service, rename it to Hadoop, or create a soft link to this directory named Hadoop.
    Modify the permissions of the path /usr/local/service/hadoop/etc/hadoop to ensure that the Doris FE startup user has write permissions: chown -R doris:doris /usr/local/service/hadoop/etc/hadoop
    Note:
    If you are using Spark in Tencent Cloud EMR, please submit a ticket to contact EMR ops and have the IP of the Follower FE added to the client allowlist.
    Use the Doris FE to start up (1.2 or below is root, 1.2 and later is Doris) running of the hdfs dfs -ls / command to verify whether the Hadoop client is installed successfully.

    Configuring the Spark Client

    FE uses the spark-submit command to submit Spark tasks, so it needs to configure the Spark client for FE. It is recommended to use version 2.4.5 of Spark2 or above.
    It would be best to have the customer provide the Spark client of the Yarn cluster they want to use. This ensures version consistency and includes the necessary spark-defaults.conf, hive-site.xml and other configuration files.
    After obtaining the Spark client, place it in /usr/local/service, rename it to spark or create a soft link to it named spark.
    Then execute the following command:
    cd /usr/local/service/spark/jars/
    zip spark_jars.zip *.jar
    Change the log level(log4j.rootLogger) of spark to INFO.
    vim /usr/local/service/spark/conf/log4j.properties
    Use the Doris FE to start up the (1.2 or below is root, 1.2 and later is Doris) running of the following command, to verify whether the Hadoop and Spark clients are installed successfully.
    spark-submit --queue default --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /usr/local/service/spark/examples/jars/spark-examples_*.jar 10

    Configuring Doris

    Configure the directory of the Spark load task. Each task generates a log file. Doris monitors the log file to obtain the task ID and status.
    mkdir -p /usr/local/service/doris/log/spark_launcher_log;
    chmod 777 /usr/local/service/doris/log/spark_launcher_log
    Add the following configurations in FE.conf:
    spark_home_default_dir=/usr/local/service/spark
    spark_resource_path=/usr/local/service/spark/jars/spark_jars.zip
    yarn_client_path=/usr/local/service/hadoop/bin/yarn
    yarn_config_dir=/usr/local/service/hadoop/etc/hadoop
    spark_dpp_version=1.2-SNAPSHOT
    After completing the configuration, restart the Master FE.
    Note:
    If assistance from Tencent Cloud technical personnel is needed while adding the configuration, please contact us by submit a ticket.

    Using Spark Load

    Creating Spark Resource

    A typical creation statement template is as follows:
    CREATE EXTERNAL RESOURCE spark_resource_xxx
    PROPERTIES
    (
    "type" = "spark",
    "spark.master" = "yarn",
    "spark.submit.deployMode" = "cluster",
    "spark.yarn.queue" = "<xxx_queue>",
    "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
    "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
    "spark.hadoop.yarn.resourcemanager.address.rm1" = "<rm1_host>:<rm1_port>",
    "spark.hadoop.yarn.resourcemanager.address.rm2" = "<rm2_host>:<rm2_port>",
    "spark.hadoop.fs.defaultFS" = "hdfs://<hdfs_defaultFS>",
    "spark.hadoop.dfs.nameservices" = "<hdfs_defaultFS>",
    "spark.hadoop.dfs.ha.namenodes.<hdfs_defaultFS>" = "nn1,nn2",
    "spark.hadoop.dfs.namenode.rpc-address.<hdfs_defaultFS>.nn1" = "<nn1_host>:<nn1_port>",
    "spark.hadoop.dfs.namenode.rpc-address.<hdfs_defaultFS>.nn2" = "<nn2_host>:<nn2_port>",
    "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
    "working_dir" = "hdfs://<hdfs_defaultFS>/doris/spark_load",
    "broker" = "<doris_broker_name>",
    "broker.username" = "hadoop",
    "broker.password" = "",
    "broker.dfs.nameservices" = "<hdfs_defaultFS>",
    "broker.dfs.ha.namenodes.<hdfs_defaultFS>" = "nn1, nn2",
    "broker.dfs.namenode.rpc-address.HDFS4001273.nn1" = "<nn1_host>:<nn1_port>",
    "broker.dfs.namenode.rpc-address.HDFS4001273.nn2" = "<nn2_host>:<nn2_port>",
    "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
    For the specific meaning of each parameter, please see Spark Load.
    The above configuration is applicable to the vast majority of EMR clusters, i.e., clusters with RM HA and HDFS HA enabled. Doris also supports the use of non-HA clusters or clusters with Kerberos authentication enabled. For specific configuration methods, please see Spark Load.
    Note:
    Common ports of Tencent Cloud EMR: <rm1_port>: 5000, <nn1_port>: 4007.

    Authorization for the Use of Spark Resource

    Resources cannot be created for common accounts which can only see the resources they have USAGE_PRIV usage permission. Therefore, if a common account needs to use certain resources, authorization is needed. Authorization can also be revoked. The specific command is as follows:
    -- Grant user0 the usage rights to spark0 resource
    GRANT USAGE_PRIV ON RESOURCE "spark_resource_xxx" TO "user0"@"%";
    
    -- Grant role0 the usage rights to spark0 resource
    GRANT USAGE_PRIV ON RESOURCE "spark_resource_xxx" TO ROLE "role0";
    
    -- Grant all resources usage rights to user0
    GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
    
    -- Grant all resources usage rights to role role0
    GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
    
    -- Revoke user0’s spark0 resource usage rights
    REVOKE USAGE_PRIV ON RESOURCE "spark_resource_xxx" FROM "user0"@"%";

    Execute Spark load tasks

    LOAD LABEL test_label_01 --label
    (
    DATA INFILE ("hdfs://HDFS4001234/warehouse/ods.db/user_events/ds=2023-04-15/*" --file path)
    INTO TABLE user_events --doris table
    FORMAT AS "parquet" --data format
    ( event_time, user_id, op_code) --columns in file
    COLUMNS FROM PATH AS ( `ds` ) --partition column
    SET
    ( --column mapping
    ds = ds,
    event_time = event_time,
    user_id = user_id,
    op_code = op_code
    )
    )
    WITH RESOURCE 'spark_resource_xxx'
    ( --spark job params
    "spark.executor.memory" = "4g",
    "spark.default.parallelism" = "400",
    "spark.executor.cores" = '5',
    "spark.executor.instances" = '10'
    )
    PROPERTIES
    ( --doris load task params
    "timeout" = "259200"
    );
    The values of label, file path, doris table, partition columns, column mapping relationship, spark task parameters and other parameters need to be modified according to the actual situation. A label cannot be reused if the task is successful.

    Managing Job

    First enter the library where the target table for import is located:use xxx_db; .

    Viewing Task

    show load where label='test_label_01';.
    Focus on the STATE and PROGRESS columns in the results. The Job state conversion path is:
    - State
    The current stage of the import task. After the task is submitted, the state is PENDING. After submitting Spark ETL, the state changes to ETL. After ETL is completed, FE schedules BE to perform the push operation and the state changes to LOADING. After the push is completed and the version takes effect, the state changes to FINISHED.
    There are two final stages of the import task: CANCELLED and FINISHED. The import is completed when the load job is in these two stages. Among them, CANCELLED means import fails, and FINISHED means import is successful.
    - Progress
    Description of the progress of the import task. There are two types of progress: ETL and LOAD, corresponding to the two stages of the import process ETL and LOADING.
    The progress range of LOAD is: 0~100%.
    `LOAD progress = Number of Tablets that have completed importing all Replicas / Total number of Tablets for this import task * 100%`
    **If all import tables completes import, the LOAD progress is 99%** The import enters the final effective stage. After the entire import is completed, the LOAD progress will change to 100%.
    Import progress is not linear. Therefore, if the progress does not change for a while, it does not mean that the import is not being executed.

    Canceling Task

    If the task has not finished, execute the command to terminate it: cancel load where label='test_label_01';.

    Troubleshooting

    A Spark load task failure can be a problem with Doris or with the external Yarn cluster being used. Therefore, both sides need to be investigated.
    First troubleshoot from Doris side:
    1. Use the show load command to view load task information based on the label.
    2. Each task will generate a log file in the directory /usr/local/service/doris/log/spark_launcher_log. The log is kept for 3 days by default. After entering this directory, execute ls *<load_task_label>* for a fuzzy search.
    3. If you can't find the above task log file or the file is empty, you need to view the Doris FE log fe.log/fe.warn.log: cd /data/cdw/doris/fe/log/.
    4. If the log shows that the Spark job execution failed, the user needs to view the Spark job log to confirm the specific error.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support