The load feature is used to import user's raw data into Doris. After successful import, users can query the data through the Mysql client. Doris supports multiple import methods. It is recommended to read this document in full before viewing the detailed documents for each import method based on the chosen import method.
Concepts
1. Frontend (FE): Metadata and scheduler node of the Doris system, primarily responsible for generating plans and scheduling import tasks during the import process.
2. Backend (BE): The computing and storage nodes in Doris system, mainly undertake data ETL and storage during the import process.
3. Broker: The broker is a standalone, stateless process. It encapsulates the file system interface, enabling Doris to read files from remote storage systems.
4. Load job: The load job reads user's submitted raw data, converts or cleans it, and then imports it into the Doris system. After the import is completed, the data can be queried by users.
5. Label: All import jobs have a label. The label is unique within a database, can be specified by the user or automatically generated by the system, and is used to identify an import job. The same label can only be used for one successful import job.
6. MySQL protocol/HTTP protocol: Doris provides two protocol interfaces, MySQL and HTTP. Some import methods use the MySQL protocol interface to submit jobs, some use the HTTP protocol interface to submit jobs.
Import Methods
To adapt to different data import needs, the Doris system provides six different import methods. Each import method supports different data sources and has different usage methods (asynchronous, and synchronous). All import methods support the CSV data format. In addition, broker load also supports parquet and orc data formats.
Please refer to the individual import methods' operation manuals for further information on each import method.
Broker Load
Broker load accesses, reads and imports data from external data sources (such as HDFS) into Doris through broker process. Users submit import jobs through the Mysql protocol, which are then executed asynchronously. The SHOW LOAD
command is used to view the import results.
Stream Load
Users submit requests through the HTTP protocol and carry the raw data to create imports. This is mainly used to quickly import data from local files or data streams into Doris. The import command returns the import results synchronously.
Insert
Similar to the insert statement in MySQL, Doris provides the INSERT INTO tbl SELECT ...;
method to read data from a table in Doris and import it into another table. Alternatively, you can use INSERT INTO tbl VALUES(...);
to insert a single data entry.
Multi Load
Users submit multiple import jobs through the HTTP protocol. Multi Load ensures the atomic effect of multiple import jobs.
Routine load
Users submit routine import jobs through the MySQL protocol to create a resident thread, which continuously reads data from the data source (like Kafka) and imports it into Doris.
Direct Import via S3 Protocol
Users directly import data through the S3 protocol, the usage is similar to broker load
Basic Principles
Importing Execution Process
+---------+ +---------+ +----------+ +-----------+
| | | | | | | |
| PENDING +----->+ ETL +----->+ LOADING +----->+ FINISHED |
| | | | | | | |
+---------+ +---+-----+ +----+-----+ +-----------+
| | |
| | |
| | |
| | | +-----------+
| | | | |
+---------------+-----------------+------------> CANCELLED |
| |
+-----------+
As shown in the picture above, an import job mainly goes through the four stages illustrated.
PENDING (optional): This stage is unique to Broker Load. After the user submits broker load, it temporarily stays at this stage until it's scheduled by the scheduler in FE. The scheduler occurs at 5-second intervals.
ETL (optional): This stage exists in versions prior to 0.10.0 and is mainly used to transform raw data according to the user's claim, and filter out raw data that did not meet the conditions. In versions after 0.10.0, the ETL stage no longer exists, and the data transformation tasks have been merged into the LOADING stage.
LOADING: In versions prior to 0.10.0, this stage is primarily used to push the transformed data into the corresponding BE storage. In versions after 0.10.0, this stage first cleanses and transforms the data, and then sends the data to BE storage. When all the imported data have been imported, it goes into the process of waiting to take effect, at which point the load job is still LOADING.
FINISHED: After all the data involved in the load job takes effect, the status of the load job becomes FINISHED. All the data imported after FINISHED have been queried.
CANCELLED: Before the job FINISH, the job may be canceled and enters the CANCELLED status. For instance, the user manually cancels, or if there is an error in the importation process. CANCELLED is also the final status of load Job and cannot be executed again.
Except for the transition from PENDING to LOADING stage which is poll-scheduled by scheduler, transitions between other stages are implemented through a callback mechanism.
Label and Atomicity
Doris guarantees atomicity for all import methods. It ensures atomic effect of the data within the same import job. There won't be a situation where only part of the data is imported.
At the same time, every import job has a label specified by the user or automatically generated by the system. Labels are unique within a database. Once an import job is successful under a label, the label cannot be reused to submit another import job. If the import job fails under a label, it can be reused.
Users can use the label mechanism to ensure that data corresponding to a label is imported at-most-once.
Synchronous and Asynchronous
The current Doris import methods are divided into two types: synchronous and asynchronous. If an external program accesses Doris's import feature, the type of import method needs to be determined before confirming the access logic.
Sync
Synchronous import means that the user creates an import task, and Doris executes the import synchronously, and returns the import result to the user after completion. Users can directly judge whether the import is successful based on the result returned by the create import task command.
The import methods of synchronous type include: Stream Load, and Insert.
Directions:
1. User (external system) creates an import task.
2. Doris returns the import results.
3. User (external system) judges the import results, and if it fails, they can resubmit the import task.
Note
If the user's import method is synchronous and the imported data is large, it may take a long time for the create import request to return a result.
Asynchronous
Asynchronous import means that after the user creates an import task, Doris directly returns a successful creation.Successful creation does not mean the data has been imported. The import task will be executed asynchronously. After successful creation, it is necessary to send viewing command to check the status of the import job through polling method. If creation fails, you can judge whether a re-creation is needed based on the error information.
The import methods of asynchronous type include:Broker Load, Multi Load.
Directions:
1. User (external system) creates an import task.
2. Doris returns the import creation results.
3. User (external system) judges the import creation results. If successful, move to step 4, if failed, go back and retry creating the import, go back to 1.
4. User (external system) polls and views the import job until its status changes to FINISHED or CANCELLED.
Notes
Whether it's asynchronous or synchronous import type, one should not constantly retry if Doris returns an import failure or an import creation failure.After failing a limited number of retries, the external system should preserve the failure information. Most problems that persist after several retries are due to incorrect usage methods or issues with the data itself.
Memory Limit
Users can limit the memory usage of a single import through the setting parameter to prevent the import from using too much memory and causing the system to run Out Of Memory (OOM).
The way to limit memory differs slightly depending on the import method. For more information, see each import manual.
An import job is usually executed across multiple backends, and the memory limit applies to a single backend's memory usage for one import job, not the memory usage across the whole cluster.
Also, each Backend will set a total memory upper limit for importing. For specific configuration, see the following common system configuration section. This configuration limits the overall memory usage of all import tasks running on that Backend.
A lower memory limit may affect the import efficiency, as the import process might frequently write data back to the disc when memory reaches the upper limit. Conversely, an excessive memory limit might cause the system to go OOM when the import concurrency is high. Therefore, it is required to set the import memory limits reasonably as required.
Best Practice
While accessing Doris for import, users generally opt for the program access method to ensure that data is periodically imported into Doris. The following primarily details the best practice for program accessing Doris.
1. Choose the appropriate import method: Select the import method based on the location of the data source. For example, if the raw data is stored in HDFS, use Broker load for import.
2. Define the protocol for the import method: If the Broker load import method is chosen, the external system must be able to use the MySQL protocol to regularly submit and view import jobs.
3. Determine the type of the import method: The import method can be synchronous or asynchronous. For example, if Broker load is an asynchronous import method, the external system must call the viewing import command after submitting the creation import, and determine whether the import is successful based on the results of the viewing import command.
4. Formulate a label generation strategy: The label generation strategy needs to follow the principle of uniqueness and fixedness for each batch of data. In this way, Doris can ensure At-Most-Once.
5. The program itself ensures At-Least-Once: The external system needs to guarantee At-Least-Once on its own to ensure the Exactly-Once of the import process.
General System Configuration
The following primarily explains several system-level configurations common to all import methods.
FE configuration
The following are the system configurations of FE, which can be modified by modifying the FE's configuration file fe.conf
.
max_load_timeout_second and min_load_timeout_second
The two configurations define the maximum and minimum import timeout in seconds respectively. The default maximum timeout is three days, and the default minimum timeout is one second. User-defined import timeout should not exceed this range. This parameter is applicable to all import methods.
desired_max_waiting_jobs
It indicates the maximum number of import tasks in the waiting queue, which defaults to 100. When the number of PENDING (waiting to be executed) imports in FE exceeds this value, new import requests will be rejected.
This configuration only applies to asynchronously executed imports. Once the number of asynchronously executed imports exceeds the default, subsequent creation import requests will be rejected.
max_running_txn_num_per_db
This configuration means the maximum number of running imports in each database (imports are counted uniformly without distinguishing the type). The default maximum import concurrency is 100. When the number of running imports in the current database exceeds the maximum value, subsequent imports will not be executed. If it's a synchronous import job, the import will be rejected. If it's an asynchronous import job, the job will wait in the queue.
BE configuration
The following configurations belong to the system configurations of BE, which can be modified by adjusting the BE's configuration file be.conf
.
push_write_mbytes_per_sec
It's the writing speed limit of a single tablet on BE. By default, it's 10, i.e., 10MB/s. Usually, the maximum writing speed of BE to a single tablet, varies between 10-30MB/s, depending on the schema and the system. You can appropriately adjust this parameter to control the import speed.
write_buffer_size
While importing data, BE first writes to a memtable, which will only be written back to the disk when it reaches the threshold. The default size is 100MB. A smaller threshold may result in a large number of small files in BE. This threshold can be appropriately increased to reduce the number of files. But, a too large threshold might cause RPC timeouts, as explained in the following configuration.
tablet_writer_rpc_timeout_sec
During the import process, this is the RPC timeout for sending a batch (1024 rows). The default is 600 seconds. As this RPC may involve the write-back operations of multiple memtables, RPC might timeout due to disk write. Hence, you can adjust this timeout to reduce timeout errors (like send batch fail
error). At the same time, if you increase the write_buffer_size
configuration, this parameter also needs to be appropriately increased.
streaming_load_rpc_max_alive_time_sec
During the import process, Doris opens a writer for each tablet to receive data and write. This parameter specifies the waiting timeout for the writer. If the writer does not receive any data within this time, the writer will be automatically destroyed. When the system processing speed is slow, the writer may not receive the next batch of data for a long time, resulting in an import error: TabletWriter add batch with unknown id
. In this case, you can appropriately increase this configuration. The default is 600 seconds.
load_process_max_memory_limit_bytes and load_process_max_memory_limit_percent
These two parameters limit the maximum memory that can be used for import tasks on a single Backend. They are the maximum memory and maximum memory percentage respectively. load_process_max_memory_limit_percent
defaults to 80, representing the percentage of the total memory limit on the Backend (the total memory limit mem_limit
defaults to 80%, representing the percentage of physical memory). That is, assuming the physical memory is M, the default import memory limit is M 80% 80%.
load_process_max_memory_limit_bytes
defaults to 100GB. The system will take the smaller of the two parameters as the final Backend import memory usage limit.
label_keep_max_second
This setting determines how long (FINISHED or CANCELLED) import records are kept in the Doris system. Default time for this parameter is 3 days. This parameter applies to all types of import tasks.
Column map
Suppose the imported data contains 1,2,3
and the table has three columns c1,c2,c3
. If the data is directly imported into the table, you can use the following statement COLUMNS(c1,c2,c3)
. This statement is equivalent to COLUMNS(tmp_c1,tmp_c2,tmp_c3,c1=tmp_c1,c2=tmp_c2,c3=tmp_c3)
.
If you want to perform transformation while importing data or use temporary variables, then the transformation or temporary variables must be specified in the order of use. For example, the statement COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = tmp_c1 +1, c2= c1+1, c3 =c2+1)
is equivalent to COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = tmp_c1 +1, c2= tmp_c1 +1+1, c3 =tmp_c1 +1+1+1)
.
When using a certain expression, this expression must be defined beforehand. For example, the following statement is illegal COLUMNS(tmp_c1,tmp_c2,tmp_c3, c1 = c1+1, c2 = temp + 1, temp = tmp_c1 +1, c3 =c2+1)
.
Was this page helpful?