Spark load realizes the preprocessing of load data by Spark, improves the performance of importing large amount of Doris data and saves computing resources of Doris cluster. It's mainly used for initial migration scenarios and large amount of data imported into Doris.
Spark load uses the resources of the Spark cluster to sort the data to be imported, and Doris BE writes files directly, which can greatly reduce the resource usage of the Doris cluster, and is very good for historical mass data migration to reduce the resource usage and load of the Doris cluster.
If users do not have the resources of Spark cluster and want to conveniently and quickly complete the migration of historical data from external storage, they can use Broker Load (HDFS Data). Compared with Spark load, importing Broker load will consume more resources on the Doris cluster. Spark load is an asynchronous import method. Users need to create a Spark import task through the MySQL protocol and view the import result with the SHOW LOAD command.
Applicable Scenario
The source data is located in a storage system that Spark has access to, such as HDFS.
Data size ranges from tens of GB to TB.
Basic Principles
Users submit Spark type import tasks through the MySQL client, the FE records metadata and returns a result that the user submitted successfully.
The execution of a Spark load task mainly involves the following five stages.
1. E schedules and submits ETL tasks to spark cluster for execution.
2. The Spark cluster completes ETL to complete the preprocessing of load data. This includes global dictionaries building (Bitmap type), partitioning, sorting, and aggregation.
3. After the ETL task is completed, FE accesses the path of each tablet that has been preprocessed, and schedules the related be to execute the push tasks.
4. BE reads the data through Broker and transforms it into Doris's underlying storage format.
5. FE schedules the activation of versions to complete the import task.
+
| 0. User create spark load job
+----v----+
| FE |---------------------------------+
+----+----+ |
| 3. FE send push tasks |
| 5. FE publish version |
+------------+------------+ |
| | | |
+---v---+ +---v---+ +---v---+ |
| BE | | BE | | BE | |1. FE submit Spark ETL job
+---^---+ +---^---+ +---^---+ |
|4. BE push with broker | |
+---+---+ +---+---+ +---+---+ |
|Broker | |Broker | |Broker | |
+---^---+ +---^---+ +---^---+ |
| | | |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
| HDFS +-------> Spark cluster |
| <-------+ |
+---------------------------------+ +-----------------------------+
Configuring Environment
Before using, ensure to configure the environment of the FE Follower node. Only the Follower node needs to be configured, and the Observer node does not.
Note:
Some operations in the configuration environment section require support from Tencent Cloud technical personnel. Please submit a ticket and contact us as required. Connect the network from Doris to the EMR cluster
It is best to ensure that they are in the same VPC, subnet, and security group to minimize network communication issues. If this is not satisfied, you need to contact Tencent Cloud Network Operations for a solution, and ultimately, use ping/telnet to confirm communication.
Configuring Hadoop Client
FE uses the Yarn command to obtain the status of running applications and kill applications, so it's necessary to configure the Yarn client for FE. It;s recommended to use version 2.5.2 or above.
Customers need to provide the Hadoop client of the Yarn cluster they want to connect to. This ensures that it's consistent with the EMR cluster version, avoids compatibility problems, and includes the necessary configuration files such as hdfs-site.xml, yarn-site.xml, and core-site.xml.
After obtaining the Hadoop client, place it in /usr/local/service, rename it to Hadoop, or create a soft link to this directory named Hadoop.
Modify the permissions of the path /usr/local/service/hadoop/etc/hadoop to ensure that the Doris FE startup user has write permissions:
chown -R doris:doris /usr/local/service/hadoop/etc/hadoop
Note:
If you are using Spark in Tencent Cloud EMR, please submit a ticket to contact EMR ops and have the IP of the Follower FE added to the client allowlist. Use the Doris FE to start up (1.2 or below is root, 1.2 and later is Doris) running of the hdfs dfs -ls /
command to verify whether the Hadoop client is installed successfully.
Configuring the Spark Client
FE uses the spark-submit command to submit Spark tasks, so it needs to configure the Spark client for FE. It is recommended to use version 2.4.5 of Spark2 or above.
It would be best to have the customer provide the Spark client of the Yarn cluster they want to use. This ensures version consistency and includes the necessary spark-defaults.conf, hive-site.xml and other configuration files.
After obtaining the Spark client, place it in /usr/local/service, rename it to spark or create a soft link to it named spark.
Then execute the following command:
cd /usr/local/service/spark/jars/
zip spark_jars.zip *.jar
Change the log level(log4j.rootLogger) of spark to INFO.
vim /usr/local/service/spark/conf/log4j.properties
Use the Doris FE to start up the (1.2 or below is root, 1.2 and later is Doris) running of the following command, to verify whether the Hadoop and Spark clients are installed successfully.
spark-submit --queue default --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi /usr/local/service/spark/examples/jars/spark-examples_*.jar 10
Configuring Doris
Configure the directory of the Spark load task. Each task generates a log file. Doris monitors the log file to obtain the task ID and status.
mkdir -p /usr/local/service/doris/log/spark_launcher_log;
chmod 777 /usr/local/service/doris/log/spark_launcher_log
Add the following configurations in FE.conf:
spark_home_default_dir=/usr/local/service/spark
spark_resource_path=/usr/local/service/spark/jars/spark_jars.zip
yarn_client_path=/usr/local/service/hadoop/bin/yarn
yarn_config_dir=/usr/local/service/hadoop/etc/hadoop
spark_dpp_version=1.2-SNAPSHOT
After completing the configuration, restart the Master FE.
Note:
If assistance from Tencent Cloud technical personnel is needed while adding the configuration, please contact us by submit a ticket. Using Spark Load
Creating Spark Resource
A typical creation statement template is as follows:
CREATE EXTERNAL RESOURCE spark_resource_xxx
PROPERTIES
(
"type" = "spark",
"spark.master" = "yarn",
"spark.submit.deployMode" = "cluster",
"spark.yarn.queue" = "<xxx_queue>",
"spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
"spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
"spark.hadoop.yarn.resourcemanager.address.rm1" = "<rm1_host>:<rm1_port>",
"spark.hadoop.yarn.resourcemanager.address.rm2" = "<rm2_host>:<rm2_port>",
"spark.hadoop.fs.defaultFS" = "hdfs://<hdfs_defaultFS>",
"spark.hadoop.dfs.nameservices" = "<hdfs_defaultFS>",
"spark.hadoop.dfs.ha.namenodes.<hdfs_defaultFS>" = "nn1,nn2",
"spark.hadoop.dfs.namenode.rpc-address.<hdfs_defaultFS>.nn1" = "<nn1_host>:<nn1_port>",
"spark.hadoop.dfs.namenode.rpc-address.<hdfs_defaultFS>.nn2" = "<nn2_host>:<nn2_port>",
"spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"working_dir" = "hdfs://<hdfs_defaultFS>/doris/spark_load",
"broker" = "<doris_broker_name>",
"broker.username" = "hadoop",
"broker.password" = "",
"broker.dfs.nameservices" = "<hdfs_defaultFS>",
"broker.dfs.ha.namenodes.<hdfs_defaultFS>" = "nn1, nn2",
"broker.dfs.namenode.rpc-address.HDFS4001273.nn1" = "<nn1_host>:<nn1_port>",
"broker.dfs.namenode.rpc-address.HDFS4001273.nn2" = "<nn2_host>:<nn2_port>",
"broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
);
For the specific meaning of each parameter, please see Spark Load. The above configuration is applicable to the vast majority of EMR clusters, i.e., clusters with RM HA and HDFS HA enabled. Doris also supports the use of non-HA clusters or clusters with Kerberos authentication enabled. For specific configuration methods, please see Spark Load. Note:
Common ports of Tencent Cloud EMR: <rm1_port>: 5000, <nn1_port>: 4007.
Authorization for the Use of Spark Resource
Resources cannot be created for common accounts which can only see the resources they have USAGE_PRIV usage permission. Therefore, if a common account needs to use certain resources, authorization is needed. Authorization can also be revoked. The specific command is as follows:
GRANT USAGE_PRIV ON RESOURCE "spark_resource_xxx" TO "user0"@"%";
GRANT USAGE_PRIV ON RESOURCE "spark_resource_xxx" TO ROLE "role0";
GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
REVOKE USAGE_PRIV ON RESOURCE "spark_resource_xxx" FROM "user0"@"%";
Execute Spark load tasks
LOAD LABEL test_label_01
(
DATA INFILE ("hdfs://HDFS4001234/warehouse/ods.db/user_events/ds=2023-04-15/*"
INTO TABLE user_events
FORMAT AS "parquet"
( event_time, user_id, op_code)
COLUMNS FROM PATH AS ( `ds` )
SET
(
ds = ds,
event_time = event_time,
user_id = user_id,
op_code = op_code
)
)
WITH RESOURCE 'spark_resource_xxx'
(
"spark.executor.memory" = "4g",
"spark.default.parallelism" = "400",
"spark.executor.cores" = '5',
"spark.executor.instances" = '10'
)
PROPERTIES
(
"timeout" = "259200"
);
The values of label, file path, doris table, partition columns, column mapping relationship, spark task parameters and other parameters need to be modified according to the actual situation. A label cannot be reused if the task is successful.
Managing Job
First enter the library where the target table for import is located:use xxx_db;
.
Viewing Task
show load where label='test_label_01';
.
Focus on the STATE and PROGRESS columns in the results. The Job state conversion path is:
- State
The current stage of the import task. After the task is submitted, the state is PENDING. After submitting Spark ETL, the state changes to ETL. After ETL is completed, FE schedules BE to perform the push operation and the state changes to LOADING. After the push is completed and the version takes effect, the state changes to FINISHED.
There are two final stages of the import task: CANCELLED and FINISHED. The import is completed when the load job is in these two stages. Among them, CANCELLED means import fails, and FINISHED means import is successful.
- Progress
Description of the progress of the import task. There are two types of progress: ETL and LOAD, corresponding to the two stages of the import process ETL and LOADING.
The progress range of LOAD is: 0~100%.
`LOAD progress = Number of Tablets that have completed importing all Replicas / Total number of Tablets for this import task * 100%`
**If all import tables completes import, the LOAD progress is 99%** The import enters the final effective stage. After the entire import is completed, the LOAD progress will change to 100%.
Import progress is not linear. Therefore, if the progress does not change for a while, it does not mean that the import is not being executed.
Canceling Task
If the task has not finished, execute the command to terminate it: cancel load where label='test_label_01';
.
Troubleshooting
A Spark load task failure can be a problem with Doris or with the external Yarn cluster being used. Therefore, both sides need to be investigated.
First troubleshoot from Doris side:
1. Use the show load
command to view load task information based on the label.
2. Each task will generate a log file in the directory /usr/local/service/doris/log/spark_launcher_log. The log is kept for 3 days by default. After entering this directory, execute ls *<load_task_label>*
for a fuzzy search.
3. If you can't find the above task log file or the file is empty, you need to view the Doris FE log fe.log/fe.warn.log:
cd /data/cdw/doris/fe/log/
.
4. If the log shows that the Spark job execution failed, the user needs to view the Spark job log to confirm the specific error.
Was this page helpful?