tencent cloud

Feedback

Spark Connector (Real-time or Batch Data Processing with Spark)

Last updated: 2024-06-27 11:03:08
    Spark load utilizes external spark computing resources to preprocess the imported data, improving the data import performance for big data volumes in Doris and saving computing resources in the Doris cluster. It's mainly used for initial migration and big data import scenarios in Doris.
    Spark load takes advantage of the spark cluster resources to sort the data to be imported, and Doris BE to directly write files. This significantly reduces the resource usage of the Doris cluster, and is very good for historical mass data migration to reduce the resource usage and load of the Doris cluster.
    If a user does not have a spark cluster resource, and wants to conveniently and quickly migrate historical data from external storage, they can use Broker Load (HDFS data). Compared with spark load, importing Broker load will consume more resources on the Doris cluster.
    Spark load is an asynchronous import method. Users need to create a spark type import job through the MySQL protocol and view the import result through SHOW LOAD.

    Applicable Scenario

    The source data is in a storage system that spark can access, such as HDFS.
    Data volumes range from tens of GB to TB level.

    Basic Principles

    Basic Process

    Users submit the spark type import job through the MySQL client. FE records metadata and returns that the user submitted successfully. The implementation of spark load task is mainly divided into the following 5 stages.
    1. FE schedules and submits ETL tasks to spark cluster for execution.
    2. The spark cluster executes ETL to complete preprocessing of the imported data. This includes building the global dictionary (for BITMAP type), partitioning, sorting, aggregating, and so on.
    3. After the ETL task is completed, FE accesses the path of each tablet that has been preprocessed, and schedules the related be to execute the push tasks.
    4. BE reads the data through Broker and transforms it into Doris underlying storage format.
    5. FE schedules the effective version and complete the import task.
    +
    | 0. User create spark load job
    +----v----+
    | FE |---------------------------------+
    +----+----+ |
    | 3. FE send push tasks |
    | 5. FE publish version |
    +------------+------------+ |
    | | | |
    +---v---+ +---v---+ +---v---+ |
    | BE | | BE | | BE | |1. FE submit Spark ETL job
    +---^---+ +---^---+ +---^---+ |
    |4. BE push with broker | |
    +---+---+ +---+---+ +---+---+ |
    |Broker | |Broker | |Broker | |
    +---^---+ +---^---+ +---^---+ |
    | | | |
    +---+------------+------------+---+ 2.ETL +-------------v---------------+
    | HDFS +-------> Spark cluster |
    | <-------+ |
    +---------------------------------+ +-----------------------------+

    Global Dictionary

    Applicable Scenario

    Currently, the bitmap columns in Doris are implemented using the Roaringbitmap class library, while the input data types of Roaringbitmap can only be integer. Therefore, it's necessary to convert the type of input data into integers to precalculate the bitmap columns in the import process. In the existing Doris import process, the data structure of the global dictionary is implemented based on the Hive table, which stores the mapping from the original value to encoded value.

    Building Process

    1. Read data from the upstream data source and generate a hive temporary table, which is recorded as hive_table.
    2. Extract the duplicated values from the fields to be deduplicated from the hive_table , and generate a new hive table, which is marked as distinct_value_table.
    3. Create a new global dictionary table, denoted as dict_table; one column is the original value, and the other is the encoded value.
    4. Perform a left join between distinct_value_table and dict_table, calculate the new deduplication value set, and then code this set with window function. At this time, the original value of the deduplication column will have one more column of encoded value. Finally, the data of these two columns will be written back to dict_table.
    5. Join dict_table and hive_table to replace the original values in hive_table with the integer encoded value.
    6. hive_table will be read by the next data preprocessing process, and imported into Doris after calculation.

    Data Preprocessing (DPP)

    Basic Process

    1. Read data from data sources. The upstream data source can be HDFS files or hive table.
    2. Map the read data, calculate the expression, and generate bucket field bucket_id based on partition information.
    3. Generate RollupTree based on Doris table's rollup Metadata.
    4. Traverse RollupTree to perform hierarchical aggression. The rollup of the next level can be calculated from the previous level.
    5. After each aggregation calculation, the data will be calculated according to bucket_id and divided into buckets and written into HDFS.
    6. Afterwards broker will pull the files in HDFS and import them into Doris Be.

    Hive Bitmap UDF

    Spark supports loading hive-generated bitmap data directly into Doris.

    Basic Operations

    Configuring ETL Cluster

    In Doris, Spark is used as an external computing resource to finish ETL tasks. In the future, there may be other external resources that will be used in Doris, like Spark/GPU for query, HDFS/S3 for external storage, and MapReduce for ETL, etc. Therefore, we introduce resource management to manage these external resources used by Doris. Before submitting the spark import task, it is necessary to configure the spark cluster that performs the ETL tasks.
    -- create spark resource
    CREATE EXTERNAL RESOURCE resource_name
    PROPERTIES
    (
    type = spark,
    spark_conf_key = spark_conf_value,
    working_dir = path,
    broker = broker_name,
    broker.property_key = property_value,
    hadoop.security.authentication = kerberos,
    kerberos_principal = doris@YOUR.COM,
    kerberos_keytab = /home/doris/my.keytab
    kerberos_keytab_content = ASDOWHDLAWIDJHWLDKSALDJSDIWALD
    )
    
    -- drop spark resource
    DROP RESOURCE resource_name
    
    -- show resources
    SHOW RESOURCES
    SHOW PROC "/resources"
    
    -- privileges
    GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
    GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name
    
    REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
    REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name

    Creating resources

    resource_name is the name of the spark resource configured in Doris. PROPERTIES are the parameters related to the spark resource, as follows:
    type: Resource type, required. Currently, only spark is supported.
    Below are the spark-related parameters:
    spark.master: required, currently supports yarn, spark://host:port.
    spark.submit.deployMode: Deployment mode of the spark program. It is required, supports both cluster and client.
    spark.hadoop.yarn.resourcemanager.address: Required when the master is yarn.
    spark.hadoop.fs.defaultFS: Required when the master is yarn.
    Other parameters are optional, see Spark Configuration.
    working_dir: Directory used by ETL. Required when spark is used as an ETL resource. For example: hdfs://host:port/tmp/doris.
    hadoop.security.authentication: Specify the authentication method as Kerberos.
    kerberos_principal: Specify the Kerberos principal.
    kerberos_keytab: Specify the keytab file path in Kerberos. It must be the absolute file path of the keytab file on the Broker process-hosting server and should be accessible to the Broker process.
    kerberos_keytab_content: Specify the content of the keytab file in kerberos, encoded with base64. This can be mutually exclusive with the kerberos_keytab configuration.
    broker: Broker name. Spark is required when used as the ETL resource. You need to use the ALTER SYSTEM ADD BROKER command to complete the configuration in advance.
    broker.property_key: The authentication information, etc., that the broker needs to be specify when reading the intermediate files generated by ETL.
    Example:
    -- yarn cluster mode
    CREATE EXTERNAL RESOURCE "spark0"
    PROPERTIES
    (
    "type" = "spark",
    "spark.master" = "yarn",
    "spark.submit.deployMode" = "cluster",
    "spark.jars" = "xxx.jar,yyy.jar",
    "spark.files" = "/tmp/aaa,/tmp/bbb",
    "spark.executor.memory" = "1g",
    "spark.yarn.queue" = "queue0",
    "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
    "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
    "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
    "broker" = "broker0",
    "broker.username" = "user0",
    "broker.password" = "password0"
    );
    
    -- spark standalone client mode
    CREATE EXTERNAL RESOURCE "spark1"
    PROPERTIES
    (
    "type" = "spark",
    "spark.master" = "spark://127.0.0.1:7777",
    "spark.submit.deployMode" = "client",
    "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
    "broker" = "broker1"
    );

    Spark Load Supporting Kerberos Authentication

    If Spark load accesses Hadoop cluster resources with Kerberos authentication, we only need to specify the following parameters when creating a spark resource:
    hadoop.security.authentication: Specify the authentication method as Kerberos.
    kerberos_principal: Specify the Kerberos principal.
    kerberos_keytab: Specify the keytab file path in Kerberos. It must be the absolute file path of the keytab file on the Broker process-hosting server and should be accessible to the Broker process.
    kerberos_keytab_content: Specify the content of the keytab file in kerberos, encoded with base64. This can be mutually exclusive with the kerberos_keytab configuration.
    Example:
    CREATE EXTERNAL RESOURCE "spark_on_kerberos"
    PROPERTIES
    (
    "type" = "spark",
    "spark.master" = "yarn",
    "spark.submit.deployMode" = "cluster",
    "spark.jars" = "xxx.jar,yyy.jar",
    "spark.files" = "/tmp/aaa,/tmp/bbb",
    "spark.executor.memory" = "1g",
    "spark.yarn.queue" = "queue0",
    "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
    "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
    "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
    "broker" = "broker0",
    "hadoop.security.authentication" = "kerberos",
    "kerberos_principal" = "doris@YOUR.COM",
    "kerberos_keytab" = "/home/doris/my.keytab"
    );

    Viewing Resources

    Ordinary users can only see the resources that they have USAGE_PRIV permission to use.
    root and admin accounts can see all resources.

    Resources Permissions

    Resource permissions are managed through GRANT REVOKE, currently only supporting USAGE_PRIV permission. USAGE_PRIV permissions can be assigned to a specific user or a role, and the role is used the same as before.
    -- Grant permission to the spark0 resource to user user0
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
    
    -- Grant permission to the spark0 resource to role role0
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
    
    -- Grant permission to all resources to user user0
    GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
    
    -- Grant permission to all resources to role role0
    GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
    
    -- Revoke the spark0 resource permission of user user0
    REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

    Configuring SPARK Client

    The FE layer submits spark tasks by executing the spark-submit command. Therefore, it is necessary to configure the spark client for FE. It is recommended to use version 2.4.5 or above of the official Spark2 version. Download spark here. After downloading, please follow the steps to complete the following configuration.

    Configuring SPARK_HOME Environment Variable

    Place the spark client under the directory on the same machine as FE and configure spark_home_default_dir in the FE configuration file. This configuration item defaults to the lib/spark2x path under the FE root directory and cannot be empty.

    Configuring SPARK Dependency Package

    Package all jar packages in the jars folder into a zip file, and configure spark_resource_path in the FE configuration file as this zip file's path. If this configuration item is empty, FE will try to find the lib/spark2x/jars/spark-2x.zip file in the FE root directory. If it is not found, an error of file not found will be reported. When a spark load task is submitted, the zip file will be uploaded to the remote repository, and the default repository path will be hung in working_dir/{cluster_id} directory named as __spark_repository__{resource_name}, which indicates that a resource in the cluster corresponds to a remote repository. The directory structure of the remote repository is as follows:
    __spark_repository__spark0/
    |-__archive_1.0.0/
    | |-__lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp-1.0.0-jar-with-dependencies.jar
    | |-__lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip
    |-__archive_1.1.0/
    | |-__lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp-1.1.0-jar-with-dependencies.jar
    | |-__lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip
    |-__archive_1.2.0/
    | |-...
    In addition to spark dependencies (default named as spark-2x.zip), FE will also upload DPP dependency packages to the remote repository. If all dependency files submitted by this spark load already exist in the remote repository, then there is no need to upload dependencies again, saving the time of repeatedly uploading a large number of files.

    Configuring YARN Client

    FE obtains the running application status and terminates the application by executing YARN commands. Therefore, you need to configure the YARN client for FE. It is recommended to use Hadoop2 official version 2.5.2 or above. Download Hadoop. After downloading, follow the steps below to complete the following configuration.

    Configuring YARN Executable Path

    Place the downloaded YARN client in the same directory as the FE on the same machine, and configure yarn_client_path item in the FE configuration file as the executable file of YARN, which is set as the lib/yarn-client/hadoop/bin/yarn under FE root directory by default. (Optional) When FE obtains the status of the application or terminates the application via the YARN client, the configuration files required for executing the yarn command will be generated by default in the lib/yarn-config path in the Fe root directory. This path can be modified by configuring yarn_config_dir item in the FE configuration file. The currently generated configuration files include core-site.xml and yarn-site.xml.

    Creating import

    Syntax:
    LOAD LABEL load_label
    (data_desc, ...)
    WITH RESOURCE resource_name
    [resource_properties]
    [PROPERTIES (key1=value1, ... )]
    
    * load_label:
    db_name.label_name
    
    * data_desc:
    DATA INFILE ('file_path', ...)
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [COLUMNS TERMINATED BY separator ]
    [(col1, ...)]
    [COLUMNS FROM PATH AS (col2, ...)]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]
    
    DATA FROM TABLE hive_external_tbl
    [NEGATIVE]
    INTO TABLE tbl_name
    [PARTITION (p1, p2)]
    [SET (k1=f1(xx), k2=f2(xx))]
    [WHERE predicate]
    
    * resource_properties:
    (key2=value2, ...)
    Example 1: upstream data source is a HDFS file
    LOAD LABEL db1.label1
    (
    DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
    INTO TABLE tbl1
    COLUMNS TERMINATED BY ","
    (tmp_c1,tmp_c2)
    SET
    (
    id=tmp_c2,
    name=tmp_c1
    ),
    DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
    INTO TABLE tbl2
    COLUMNS TERMINATED BY ","
    (col1, col2)
    where col1 > 1
    )
    WITH RESOURCE 'spark0'
    (
    "spark.executor.memory" = "2g",
    "spark.shuffle.compress" = "true"
    )
    PROPERTIES
    (
    "timeout" = "3600"
    );
    Example 2: upstream data source is a hive table
    Step 1: Create a new Hive external table
    CREATE EXTERNAL TABLE hive_t1
    (
    k1 INT,
    K2 SMALLINT,
    k3 varchar(50),
    uuid varchar(100)
    )
    ENGINE=hive
    properties
    (
    "database" = "tmp",
    "table" = "t1",
    "hive.metastore.uris" = "thrift://0.0.0.0:8080"
    );
    
    Step 2: Submit load command. It is required that the column in the imported Doris table exists in the Hive external table.
    LOAD LABEL db1.label1
    (
    DATA FROM TABLE hive_t1
    INTO TABLE tbl1
    SET
    (
    uuid=bitmap_dict(uuid)
    )
    )
    WITH RESOURCE 'spark0'
    (
    "spark.executor.memory" = "2g",
    "spark.shuffle.compress" = "true"
    )
    PROPERTIES
    (
    "timeout" = "3600"
    );
    Example 3: The upstream data source is a binary hive type
    Step 1: Create a new Hive external table
    CREATE EXTERNAL TABLE hive_t1
    (
    k1 INT,
    K2 SMALLINT,
    k3 varchar(50),
    uuid varchar(100) //hive-type binaries
    )
    ENGINE=hive
    properties
    (
    "database" = "tmp",
    "table" = "t1",
    "hive.metastore.uris" = "thrift://0.0.0.0:8080"
    );
    
    Step 2: Submit load command. It is required that the column in the imported Doris table exists in the Hive external table.
    LOAD LABEL db1.label1
    (
    DATA FROM TABLE hive_t1
    INTO TABLE tbl1
    SET
    (
    uuid=binary_bitmap(uuid)
    )
    )
    WITH RESOURCE 'spark0'
    (
    "spark.executor.memory" = "2g",
    "spark.shuffle.compress" = "true"
    )
    PROPERTIES
    (
    "timeout" = "3600"
    );
    Example 4: Importing the data of Hive partition table
    --hive table creation statement
    create table test_partition(
    id int,
    name string,
    age int
    )
    partitioned by (dt string)
    row format delimited fields terminated by ','
    stored as textfile;
    
    --doris table creation statement
    CREATE TABLE IF NOT EXISTS test_partition_04
    (
    dt date,
    id int,
    name string,
    age int
    )
    UNIQUE KEY(dt, id)
    DISTRIBUTED BY HASH(id) BUCKETS 1
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 1"
    );
    --spark load statement
    CREATE EXTERNAL RESOURCE "spark_resource"
    PROPERTIES
    (
    "type" = "spark",
    "spark.master" = "yarn",
    "spark.submit.deployMode" = "cluster",
    "spark.executor.memory" = "1g",
    "spark.yarn.queue" = "default",
    "spark.hadoop.yarn.resourcemanager.address" = "localhost:50056",
    "spark.hadoop.fs.defaultFS" = "hdfs://localhost:9000",
    "working_dir" = "hdfs://localhost:9000/tmp/doris",
    "broker" = "broker_01"
    );
    LOAD LABEL demo.test_hive_partition_table_18
    (
    DATA INFILE("hdfs://localhost:9000/user/hive/warehouse/demo.db/test/dt=2022-08-01/*")
    INTO TABLE test_partition_04
    COLUMNS TERMINATED BY ","
    FORMAT AS "csv"
    (id,name,age)
    COLUMNS FROM PATH AS (dt)
    SET
    (
    dt=dt,
    id=id,
    name=name,
    age=age
    )
    )
    WITH RESOURCE 'spark_resource'
    (
    "spark.executor.memory" = "1g",
    "spark.shuffle.compress" = "true"
    )
    PROPERTIES
    (
    "timeout" = "3600"
    );
    You can view details syntax about creating load by input HELP SPARK LOAD. This paper mainly introduces the parameter meaning and precautions in the creation and load syntax of a spark load.

    Label

    The identification of the import task. Each import task has a unique label within a single database. The specific rules are consistent with Broker Load.

    Data Description Parameters

    Currently supported data sources are CSV and Hive table. Other rules are consistent with Broker Load.

    Import Job Parameter

    Import Job parameters mainly refer to the opt_properties in the spark load. The Import Job parameters are applied to the entire import Job. Rules are consistent with Broker Load.

    Spark Resource Parameters

    Spark resources need to be pre-configured into the Doris system and users should be given USAGE_PRIV permissions to use spark load. When users have temporary needs, such as increasing the resources for tasks and modifying spark configs, they can be set here. The setting only takes effect for this task, without affecting the existing configuration in the Doris cluster.
    WITH RESOURCE 'spark0'
    (
    "spark.driver.memory" = "1g",
    "spark.executor.memory" = "3g"
    )

    Importing When the Data Source is the Hive Table

    At present, if you want to use the Hive table as a data source in the import process, a new external table of type Hive needs to be created first. Then, when submitting the import command, you can specify the name of the external table.

    Building a Global Dictionary for the Import Process

    The data type applicable to the aggregate columns of the Doris table is of type bitmap. In the load command, specify the field to build a global dictionary, format: Doris field name = bitmap_dict(Hive table field name). It should be noted that the construction of global dictionary is supported only when the upstream data source is a Hive table.

    *Import of hive binary (bitmap) type column

    The data type applicable to the aggregate column of the doris table is bitmap type, and the data type of the corresponding column in the hive table of teh data source is binary (through the org.apache.doris.load.loadv2.dpp.BitmapValue in FE's spark-dpp class serialized). There is no need to build a global dictionary, just specify the corresponding field in the load command, the format is: doris field name = binary_bitmap(hive table field name). Likewise, the binary (bitmap) type of data import is currently only supported when the upstream data source is a hive table.

    Viewing Import

    Spark Load import is asynchronous like Broker Load, so users must create the load label record and Using Label in Viewing Import Command to View the Import Results. The viewing import command is universal in all import methods. The specific syntax can be viewed by executing the HELP SHOW LOAD. Example:
    mysql> show load order by createtime desc limit 1\\G
    ************************* 1. row *************************
    JobId: 76391
    Label: label1
    State: FINISHED
    Progress: ETL:100%; LOAD:100%
    Type: SPARK
    EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
    TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
    ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
    EtlStartTime: 2019-07-27 11:46:44
    EtlFinishTime: 2019-07-27 11:49:44
    LoadStartTime: 2019-07-27 11:49:44
    LoadFinishTime: 2019-07-27 11:50:16
    URL: http://1.1.1.1:8089/proxy/application_1586619723848_0035/
    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
    The meaning of the parameters in the returned result set can be found in the Broker Load. The differences are as follows:
    State The stage where the import job is currently located. After the task is submitted, the status is PENDING. After Spark ETL is submitted, the status changes to ETL. After ETL is completed, FE schedules BE to perform push operations and the status changes to LOADING. After the push is completed and the version is effective, the status changes to FINISHED. There are two final stages of the import job: CANCELLED and FINISHED. The import is completed when the Load job is in these two stages. CANCELLED means the import failed, and FINISHED means the import was successful.
    Progress The progress description of the import job. There are two types of progress: ETL and LOAD, which correspond to the two stages of the import process, ETL and LOADING. The progress range of LOAD is: 0~100%. LOAD progress = the number of tablets of the completed replica import / the total number of tablets in this import job * 100%If all import tables are imported,, the LOAD progress is 99%. The import enters the final effective stage. Only after the entire import is completed, the LOAD progress will be changed to 100%. The import progress is not linear. Therefore, if the progress does not change for a period of time, it does not mean that the import is not in execution.
    Type The type of import job. The Spark Load is SPARK.
    CreateTime/EtlStartTime/EtlFinishTime/LoadStartTime/LoadFinishTime These values ??represent the creation time of import, the start time of the ETL stage, the completion time of the ETL stage, the start time of the LOADING stage, and the completion time of the entire import job.
    JobDetails Display some detailed running status of some Jobs, which will be updated when the ETL ends. It includes the number of import files, total size (bytes), number of sub-tasks, number of processed original rows, etc. {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}
    URL You can copy this URL to the browser and jump to the web interface of the corresponding application.

    Viewing the Spark Launcher Commit Log

    Sometimes users need to view the detailed log generated during the spark task submission process. The log are saved in the log/spark_launcher_log directory under the root of FE by default, and is named as spark_launcher_{load_job_id}_{label}.log. The log will be saved in this directory for a period of time. When the import information in the FE metadata is cleared up, the corresponding log will also be cleaned. The default saving time is 3 days.

    Canceling Import

    When the status of the Spark Load Job is not CANCELLED or FINISHED, it can be manually canceled by the user. When canceling, you need to specify the Label to cancel the load job. The syntax for the cancel import command can be viewed by executing the HELP CANCEL LOAD peek.

    Related System Configuration

    FE configuration

    The following configuration belongs to the system level configuration of spark load, that is, the configuration for all spark load import tasks. The configuration value is mainly adjusted by modifying the fe.conf.
    enable_spark_load Enable Spark load and create resource feature. Default is false, this feature is off.
    spark_load_default_timeout_second Default task timeout is 259200 seconds (3 days).
    spark_home_default_dir Path of spark client (fe/lib/spark2x).
    spark_resource_path Path of the packaged spark dependency files (default is empty).
    spark_launcher_log_dir Directory where the spark client submission log is stored (fe/log/spark_launcher_log).
    yarn_client_path Path of the Yarn binary executable file (fe/lib/yarn-client/hadoop/bin/yarn).
    yarn_config_dir Path where yarn configuration files are generated (fe/lib/yarn-config).

    Best Practice

    Application Scenario

    The most suitable scenario for using spark load is that the raw data is in the file system (HDFS), and the data volume ranges from tens of GBs to TBs. For small amount of data, it is recommended to use Stream load or Broker Load.

    FAQs

    Currently, spark load does not support importing data into Doris tables with String type fields. If your table fields are of type String, please change them to type varchar, otherwise the import will fail, promptin type:ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel.
    When using spark load, the spark-env.sh configuration in the spark client does not have the HADOOP_CONF_DIR environment variable. If the HADOOP_CONF_DIR environment variable is not set, the error When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment. will be reported.
    When using spark load, the spark_home_default_dir does not specify correctly. When a spark job is submitted, the spark-submit command is used. If the spark_home_default_dir setting is incorrect, the error Cannot run program "xxx/bin/spark-submit": error=2, No such file or directory will be reported.
    When using Spark Load, the spark_resource_path configuration item does not point to the packaged zip file. If the spark_resource_path is not set correctly, the error File xxx/jars/spark-2x.zip does not exist will be reported.
    When using Spark Load, the yarn_client_path configuration item does not point to an executable file of Yarn. If the yarn_client_path is not set correctly, the error yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn will be reported.
    When using spark load, the hadoop-config.sh configuration in the yarn client does not have the JAVA_HOME environment variable. If the JAVA_HOME environment variable is not set, the error yarn application kill failed. app id: xxx, load job id: xxx, msg: which: no xxx/lib/yarn-client/hadoop/bin/yarn in ((null)) Error: JAVA_HOME is not set and could not be found will be reported.

    More help

    For more details on using Spark Load syntax, enter HELP SPARK LOAD in the MySQL client command line to get more help information.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support