tencent cloud

Feedback

Stream Load (local file)

Last updated: 2024-06-27 10:55:41
    Stream load is a synchronous data import method. Users import local files or data streams into Doris by sending HTTP protocol requests. Stream load synchronously executes the data import and returns the import result. Users can directly determine whether the import is successful by the response body of the request. Stream load is mainly suitable for importing local files or data from data streams through the program.

    Basic Principles

    The following figure displays the main flow of stream load, omitting some import details.
    ^ +
    | |
    | | 1A. User submit load to FE
    | |
    | +--v-----------+
    | | FE |
    5. Return result to user | +--+-----------+
    | |
    | | 2. Redirect to BE
    | |
    | +--v-----------+
    +---+Coordinator BE| 1B. User submit load to BE
    +-+-----+----+-+
    | | |
    +-----+ | +-----+
    | | | 3. Distrbute data
    | | |
    +-v-+ +-v-+ +-v-+
    |BE | |BE | |BE |
    +---+ +---+ +---+
    In stream load, Doris will select a node as the coordinator node. This node is responsible for receiving data and distributing data to other data nodes. Users submit import commands through HTTP protocol. If it is submitted to FE, then FE will forward the request to a BE through the HTTP redirect directive. Users can also directly submit the import command to a specified BE. The final result of the import is returned to the user by the coordinator BE. Therefore, the machine initiating the import request needs to have the permission to access the HTTP service port of the BE node.
    The default HTTP service port of FE node is 8030, and the default HTTP service port of the BE node is 8040 (which can be viewed through show frontends; show backends; and other statements).

    Supported Data Formats

    Currently, stream load supports two data formats: CSV (text) and JSON.

    Basic Operations

    Creating import

    Stream load submits and transfers data through the HTTP protocol. Here the curl command is used to show how to submit an import. Users can also use other HTTP clients for operations.
    curl --location-trusted -u user:passwd [-H ""...] -T data.file -XPUT http://fe_host:http_port/api/{db}/{table}/_stream_load
    
    # For supported properties in Header, see 'Import Task Parameters' below.
    # The format is: -H "key1:value1"
    Example:
    curl --location-trusted -u root -T date -H "label:123" http://abc.com:8030/api/test/date/_stream_load
    For detailed syntax help in creating import, perform HELP STREAM LOAD. Below mainly introduces the meaning of some parameters in creating stream load.
    Signature Parameter
    user/passwd Stream load uses the HTTP protocol to create the import, and authenticates the signature through basic access authentication. The Doris system will verify user identity and import permissions according to the signature.
    Import task parameters Since stream load uses the HTTP protocol, all parameters related to the import task are set in the Header. The following mainly introduces the meaning of some parameters in the stream load import task parameters.
    Label The identity of import task. Each import task has a unique label within a single database. Label is a user-defined name in the import command. with this label, the user can view the execution status of the corresponding import task. Another function of the label is to prevent users from re-importing the same data. It is strongly recommended that users use the same label for the same batch of data. In this way, repeated requests for the same batch of data will only be accepted once, ensuring At-Most-Once. When the corresponding import operation state of label is CANCELLED, the label can be reused.
    column_separator It is used to specify the column separator in the import file, which is \\t by default. If it is an invisible character, it needs to be prefixed with \\x and use hexadecimal to indicate the separator. The separator of \\x01 of the hive file needs to be specified as -H "column_separator:\\x01". You can use a combination of multiple characters as a column separator.
    line_delimiter It is used to specify the line delimiter in the import file, which is \\n by default. You can use a combination of many characters as a newline character.
    max_filter_ratio The maximum tolerance rate of the import task. The default value is 0. The range of values is 0~1. If the error rate of the import exceeds this value, the import will fail. If users want to ignore erroneous rows, they can set this parameter greater than 0 to ensure the success of the import. The calculation formula is: (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) ) > max_filter_ratio``dpp.abnorm.ALL denotes the number of rows whose data quality is not up to standard. For example, type mismatches, some column mismatches, length mismatches and so on. dpp.norm.ALL refers to the number of correct data during the import process. The correct amount of data for the import task can be queried through the SHOW LOAD command. The number of rows in the original file= dpp.abnorm.ALL + dpp.norm.ALL
    where Import the filter conditions specified by the task. Stream load supports filtering of where statements specified for raw data. The filtered data will not be imported or participated in the calculation of the filter ratio, but it will be counted as num_rows_unselected.
    Partitions The Partition information for the table to be imported. If the data to be imported does not belong to the specified partition, it will not be imported. This data will be included in dpp.abnorm.ALL.
    columns The function transformation configuration for the data to be imported includes the sequence change of columns and expression transformations, in which the expression transformation is consistent with the query statement.
    Example of column order transformation: There are three columns of original data (src_c1, src_c2, src_c3), the current Doris table also has three columns (dst_c1, dst_c2, dst_c3)
    
    If the src_c1 column of the original table corresponds to the dst_c1 column of the target table, the src_c2 column of the original table corresponds to the dst_c2 column of the target table, and the src_c3 column of the original table corresponds to the dst_c3 column of the target, then the format is as follows:
    columns: dst_c1, dst_c2, dst_c3
    
    If the src_c1 column of the original table corresponds to the dst_c2 column of the target table, the src_c2 column of the original table corresponds to the dst_c3 column of the target table, and the src_c3 column of the original table corresponds to the dst_c1 column of the target, then the format is as follows:
    columns: dst_c2, dst_c3, dst_c1
    
    Example of expression transformation: There are two columns in the original file, and two columns in the target table (c1, c2). However, both columns in the original file need to be transformed by functions to correspond to the two columns in the target table. The format is as follows:
    columns: tmp_c1, tmp_c2, c1 = year(tmp_c1), c2 = month(tmp_c2)
    tmp_* is a placeholder, representing the two original columns in the original file.
    exec_mem_limit Import memory limit. The default value is 2GB, and the unit is byte.
    strict_mode Strict mode can be enabled in stream load by claiming strict_mode=true in the HEADER. The default strict mode is disabled. The meaning of the strict mode is: to strictly filter column type transformations during the import process. The strict filtering strategy is as follows:
    1.1 When it comes to column type transformation, if strict mode is true, then erroneous data will be filtered. Here, the erroneous data refers to: the original data is not null, and the result is null after participating in the column type transformation.
    1.2 Strict mode does not affect it when a certain column of the imported data is generated by function transformation.
    1.3 For columns in the imported data that include range limits, if the original data can pass the type transformation but out of the range limit, strict mode does not affect them as well. For example: if the type is decimal(1,0), and the original data is 10, then it can pass the type conversion but is out of the range of the column claim. Strict mode does not affect such data.
    merge_type Type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, indicating that all this batch of data need to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics to be used in conjunction with the delete condition, indicating that data that meets the delete condition is processed in accordance with DELETE semantics and the rest is processed in accordance with APPEND semantics.
    two_phase_commit The stream load import can enable a two-stage transaction commit mode: in the stream load process, the data is returned to the user as soon as it is written. At this point, the data is invisible and the transaction status is PRECOMMITTED. The data is only visible after the user manually triggers the commit operation. The default two-stage batch transaction submission is disabled.
    Note
    Activation method: Configure disable_stream_load_2pc=false in be.conf And claim two_phase_commit=true in HEADER .
    Example:
    1. Initiate a stream load pre-commit operation:
    curl --location-trusted -u user:passwd -H "two_phase_commit:true" -T test.txt http://fe_host:http_port/api/{db}/{table}/_stream_load
    {
    "TxnId": 18036,
    "Label": "55c8ffc9-1c40-4d51-b75e-f2265b3602ef",
    "TwoPhaseCommit": "true",
    "Status": "Success",
    "Message": "OK",
    "NumberTotalRows": 100,
    "NumberLoadedRows": 100,
    "NumberFilteredRows": 0,
    "NumberUnselectedRows": 0,
    "LoadBytes": 1031,
    "LoadTimeMs": 77,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 1,
    "ReadDataTimeMs": 0,
    "WriteDataTimeMs": 58,
    "CommitAndPublishTimeMs": 0
    }
    2. Trigger a commit operation on the transaction:
    curl -X PUT --location-trusted -u user:passwd -H "txn_id:18036" -H "txn_operation:commit" http://fe_host:http_port/api/{db}/_stream_load_2pc
    {
    "status": "Success",
    "msg": "transaction [18036] commit successfully."
    }
    3. Trigger an abort operation on the transaction:
    curl -X PUT --location-trusted -u user:passwd -H "txn_id:18037" -H "txn_operation:abort" http://fe_host:http_port/api/{db}/_stream_load_2pc
    {
    "status": "Success",
    "msg": "transaction [18037] abort successfully."
    }

    Returned result

    Because stream load is a synchronous import method, the import result is directly returned to the user by creating the return value of the import. Example:
    {
    "TxnId": 1003,
    "Label": "b6f3bc78-0d2c-45d9-9e4c-faa0a0149bee",
    "Status": "Success",
    "ExistingJobStatus": "FINISHED", // optional
    "Message": "OK",
    "NumberTotalRows": 1000000,
    "NumberLoadedRows": 1000000,
    "NumberFilteredRows": 1,
    "NumberUnselectedRows": 0,
    "LoadBytes": 40888898,
    "LoadTimeMs": 2144,
    "BeginTxnTimeMs": 1,
    "StreamLoadPutTimeMs": 2,
    "ReadDataTimeMs": 325,
    "WriteDataTimeMs": 1933,
    "CommitAndPublishTimeMs": 106,
    "ErrorURL": "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5-abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005"
    }
    The following mainly explains the parameters of the stream load import result:
    TxnId: Imported transaction ID. Users do not perceive.
    Label: Import Label. Specified by the user or automatically generated by the system.
    Status: Import completion status.
    "Success": It means the import was successful.
    "Publish Timeout": This status also indicates that the import is completed, except that the data may be delayed and visible without retrying.
    "Label Already Exists": The Label is duplicated. You need to replace the Label.
    "Fail": The import failed.
    ExistingJobStatus: The status of the import job corresponding to the existing label. This field is displayed only when the status is "Label Already Exists". Users can learn about the status of the import job corresponding to the existing label through this status. "RUNNING" means the job is still executing, and "FINISHED" means the job is successful.
    Message: Import error information.
    NumberTotalRows: The total number of rows imported for total processing.
    NumberLoadedRows: The number of successfully imported rows.
    NumberFilteredRows: The number of rows that do not meet data quality standards.
    NumberUnselectedRows: The number of rows filtered by the where condition.
    LoadBytes: The number of bytes imported.
    LoadTimeMs: Import completion time. Unit: milliseconds.
    BeginTxnTimeMs: The time it takes for RPC to Fe to begin a transaction. Unit: milliseconds.
    StreamLoadPutTimeMs: The time it takes for RPC to Fe to get a stream load plan. Unit milliseconds.
    ReadDataTimeMs: The time it takes to read the data. Unit: milliseconds.
    WriteDataTimeMs: The time it takes to execute the data writing operation. Unit: milliseconds.
    CommitAndPublishTimeMs: The time it takes for RPC to Fe to commit and publish transaction. Unit: milliseconds.
    ErrorURL: If there are data quality problems, you can view the specific error lines by accessing this URL.
    Note
    Since stream load uses a synchronous import method, import information will not be recorded in the Doris system. Users cannot asynchronously view Stream load through the import command. You need to listen for the return value of the create import request to get the import result.

    Canceling Import

    Users cannot manually cancel stream load. Stream load will be automatically canceled by the system after a timeout or import error.

    Viewing Stream Load

    Users can view completed stream load tasks via show stream load. By default, BE does not record stream load logs. If you want to view records that need to enabled on BE, the configuration parameter is: enable_stream_load_record=true. Please see BE configuration items for specific configurations.

    Related System Configuration

    FE configuration

    stream_load_default_timeout_second The timeout of the import task (in seconds) will be cancelled by the system if the import task is not completed within the set timeout time, and will become CANCELLED. The default timeout is 600 seconds. If the imported source file cannot complete the import within the specified time, the FE parameter stream_load_default_timeout_second needs to be adjusted.

    BE configuration

    streaming_load_max_mb The maximum import size of stream load. The default value is 10G, and the unit is MB. If the user's original file exceeds this value, then the BE parameter streaming_load_max_mb needs to be adjusted.

    Best Practice

    Application Scenario

    The most suitable scenario for using stream load is when the original file is in memory or on disk. Secondly, since stream load is a synchronous import method, users who want to synchronously obtain the import results can also use this import method.

    Data Volume

    Since stream load is based on the BE initiative to import and distribute data, the recommended amount of imported data is between 1G and 10G. Since the default maximum stream load import data volume is 10G, if you want to import files over 10G, you need to modify the BE configuration streaming_load_max_mb
    For example, the size of the file to be imported is 15G
    Modify the BE configuration streaming_load_max_mb to 16000.
    The default timeout of stream load is 300 seconds. Given Doris's current maximum import speed limit, it is necessary to modify the default timeout of import task for files exceeding about 3G.
    Import task timeout = Import data volume / 10M/s (The specific average import speed needs to be calculated by the user based on their own cluster conditions)
    For example: import a 10GB file
    timeout = 1000s equals 10G / 10M/s

    Complete Example

    Data Situation: The data is in the local disk path /home/store_sales of the sending and importing requester, the imported data volume is about 15G, and it is hoped to be imported into the store_sales table of the bj_sales database. Cluster Situation: The concurrency of stream load is not affected by the cluster size.
    Step 1: Check if the import file size exceeds the default maximum import size of 10G.
    Modify BE config
    streaming_load_max_mb = 16000
    Step 2: Calculate whether the approximate import time exceeds the default timeout value.
    Import Time ≈ 15000 / 10 = 1500s
    Over the default timeout, you need to modify the FE configuration.
    stream_load_default_timeout_second = 1500
    Step 3: Create the import task.
    curl --location-trusted -u user:password -T /home/store_sales -H "label:abc" http://abc.com:8000/api/bj_sales/store_sales/_stream_load

    FAQs

    How to solve the problem when 'Label Already Exists' appears?

    The troubleshooting steps for the duplicated labels in stream load are as follows:
    1. Is there an label conflict that is caused by the label that already exists imported using other import methods: Since the imported labels in Doris system do not distinguish between the import methods, there is a problem that other import methods use the same label. Through SHOW LOAD WHERE LABEL = "xxx", where 'xxx' is the repeated label string, see if there is a label imported by FINISHED that is the same as the label created by the user.
    2. Are same stream loads submitted repeatedly for the same job: Since stream load is submitted via HTTP protocol to create import tasks, HTTP Client in various languages usually have their own request retry logic. Once Doris system receives the first request, it starts operating stream load, but because the result is not returned to the client in time, the Client will retry to create the request. At this point, the Doris system is operating on the first request, the second request is reported to Label Already Exists. To sort out the possible methods mentioned above: Search FE aster's log with label to see if there are two redirect load action to destination=redirect load action to destination cases in the same label. If so, the request is submitted repeatedly by the Client. It is recommended that users calculate the approximate import time based on the amount of data currently requested, and change the request overtime on the client side to a value greater than the import timeout time according to the import timeout time to avoid multiple submissions of the request by the Client.
    3. Connection Reset Exception In version 0.14.0 and earlier versions, the connection reset exception occurred after HTTP V2 was enabled, because the built-in web container is tomcat, and there is a problem with the implementation of this protocol. All in the case of using stream load to import large quantities of data, a connection reset exception might occur. This is because tomcat started data transmission before a 307 jump, which resulted in the lack of authentication information when the BE received the data request. Later, changing the built-in container to Jetty solved this problem. If you encounter this issue, please upgrade your version or disable HTTP V2 (enable_http_server_v2=false). After upgrading, also upgrade the http client version of your program to 4.5.13. Please introduce the following dependency in your pom.xml file:
    <dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.13</version>
    </dependency>

    More help

    For more detailed information, you can enter HELP STREAM LOAD client command line on the Mysql client command line.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support