tencent cloud

Feedback

ES External Table

Last updated: 2024-06-27 11:13:37
    Note:
    The content showcased in this document is only suitable for TCHouse-D v1.1 and below. For later versions, it is recommended to use the Multi-Catalog feature for interfacing with external data directories.
    Doris-On-ES combines the distributed query planning capability of Doris with the full-text search capability of ES to provide a full-fledged OLAP solution that is able to perform:
    1. Multi-index distributed Join query in ES.
    2. Joint query of tables in Doris and ES, more complex full-text search filtering.
    This document mainly introduces the implementation principles and usage methods of this feature.

    Definitions

    Doris- related

    FE: Frontend, the front-end node of Doris, responsible for metadata management and request access.
    BE: Backend, the back-end node of Doris, responsible for query execution and data storage.

    ES-related

    DataNode: Data storage and computing node of ES.
    MasterNode: The master node of ES, managing metadata, node, and data distribution, etc.
    scroll: ES built-in dataset cursor feature, used for stream scanning and filtering of data.
    _source: The original JSON format document content passed in during import.
    doc_values: column storage definition of fields in ES/Lucene.
    keyword: string type field, ES/Lucene does not tokenize text content.
    text: string type field, ES/Lucene will tokenize text content. The tokenizer needs to be specified by the user. The standard English tokenizer is used by default.

    Usage

    Creating ES Index

    PUT test
    {
    "settings": {
    "index": {
    "number_of_shards": "1",
    "number_of_replicas": "0"
    }
    },
    "mappings": {
    "doc": { // ES 7.x and later do not require type to be specified when creating an index. There will be a default and unique `_doc` type
    "properties": {
    "k1": {
    "type": "long"
    },
    "k2": {
    "type": "date"
    },
    "k3": {
    "type": "keyword"
    },
    "k4": {
    "type": "text",
    "analyzer": "standard"
    },
    "k5": {
    "type": "float"
    }
    }
    }
    }
    }

    Importing data into the ES Index

    OST /_bulk
    {"index":{"_index":"test","_type":"doc"}}
    { "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Elasticsearch", "k4": "Trying out Elasticsearch", "k5": 10.0}
    {"index":{"_index":"test","_type":"doc"}}
    { "k1" : 100, "k2": "2020-01-01", "k3": "Trying out Doris", "k4": "Trying out Doris", "k5": 10.0}
    {"index":{"_index":"test","_type":"doc"}}
    { "k1" : 100, "k2": "2020-01-01", "k3": "Doris On ES", "k4": "Doris On ES", "k5": 10.0}
    {"index":{"_index":"test","_type":"doc"}}
    { "k1" : 100, "k2": "2020-01-01", "k3": "Doris", "k4": "Doris", "k5": 10.0}
    {"index":{"_index":"test","_type":"doc"}}
    { "k1" : 100, "k2": "2020-01-01", "k3": "ES", "k4": "ES", "k5": 10.0}

    Creating an ES External Table in Doris

    For more information on the syntax of creating a table, see CREATE TABLE.
    CREATE EXTERNAL TABLE `test` // schema is not specified. Automatically ES mapping is pulled for table creation
    ENGINE=ELASTICSEARCH
    PROPERTIES (
    "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
    "index" = "test",
    "type" = "doc",
    "user" = "root",
    "password" = "root"
    );
    
    CREATE EXTERNAL TABLE `test` (
    `k1` bigint(20) COMMENT "",
    `k2` datetime COMMENT "",
    `k3` varchar(20) COMMENT "",
    `k4` varchar(100) COMMENT "",
    `k5` float COMMENT ""
    ) ENGINE=ELASTICSEARCH // The ENGINE must be Elasticsearch
    PROPERTIES (
    "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
    "index" = "test",
    "type" = "doc",
    "user" = "root",
    "password" = "root"
    );
    Parameter description:
    Parameter
    Description
    hosts
    ES cluster address, can be one or multiple addresses, or the load balancer address of ES
    index
    The index name of the corresponding ES, supporting alias. If using doc_value, you need to use the real name.
    type
    The index type. This parameter is not required to import for ES 7.x and later versions.
    user
    ES cluster username
    password
    Cipher information for the application
    For clusters before ES 7.x, ensure to select the correct index type when creating a table.
    Currently, only supports HTTP Basic authentication is supported, and users must have read permissions to the path and index of :/_cluster/state/,_nodes/http. If cluster security authentication is not enabled, we do not need to set username and password.
    The column names in the Doris table need to match the field names in ES exactly, and the field types should be consistent.
    The ENGINE must be Elasticsearch.

    Filter Condition Pushdown

    One significant feature of Doris On ES is the pushdown of filter conditions: the conditions are pushed down to ES, so only data that truly meets the criteria is returned, significantly improving query performance and reducing the CPU, memory, and IO usage of both Doris and Elasticsearch.
    enable_new_es_dsl indicates whether to use the new DSL generation logic. All subsequent bug fixes and iterations are developed in the new DSL. The value is true by default, and can be modified in fe.conf.
    The following operators will be optimized to the following ES Queries:
    SQL syntax
    ES 5.x+ syntax
    =
    term query
    in
    terms query
    > , < , >= , ⇐
    range query
    and
    bool.filter
    or
    bool.should
    not
    bool.must_not
    not in
    bool.must_not + terms query
    is_not_null
    exists query
    is_null
    bool.must_not + exists query
    esquery
    ES Native json format QueryDSL

    Data Type mapping

    Doris/ES
    byte
    short
    integer
    long
    float
    double
    keyword
    text
    date
    tinyint
    -
    -
    -
    -
    -
    -
    -
    -
    smallint
    -
    -
    -
    -
    -
    -
    -
    int
    -
    -
    -
    -
    -
    -
    bigint
    -
    -
    -
    -
    -
    float
    -
    -
    -
    -
    -
    -
    -
    -
    double
    -
    -
    -
    -
    -
    -
    -
    -
    char
    -
    -
    -
    -
    -
    -
    -
    varchar
    -
    -
    -
    -
    -
    -
    -
    date
    -
    -
    -
    -
    -
    -
    -
    -
    datetime
    -
    -
    -
    -
    -
    -
    -
    -

    Enable Columnar Scan for Faster Queries (enable_docvalue_scan=true)

    CREATE EXTERNAL TABLE `test` (
    `k1` bigint(20) COMMENT "",
    `k2` datetime COMMENT "",
    `k3` varchar(20) COMMENT "",
    `k4` varchar(100) COMMENT "",
    `k5` float COMMENT ""
    ) ENGINE=ELASTICSEARCH
    PROPERTIES (
    "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
    "index" = "test",
    "user" = "root",
    "password" = "root",
    "enable_docvalue_scan" = "true"
    );
    Parameter description:
    Parameter
    Description
    enable_docvalue_scan
    Whether to enable the function of obtaining the value of queried field through ES/Lucene columnar storage. The default value is false.
    After enabling, Doris will access data from ES according to the following two principles:
    Try and see: Doris will automatically check if columnar storage is enabled for the target fields (doc_value: true). If it is, Doris will obtain all values in the fields from the columnar storage.
    Automatic downgrade: If any field to be accessed does not have columnar storage, all field values will be parsed from row storage_source.

    Benefits

    By default, Doris On ES obtains all required columns from _source. The _source is in row storage and JSON format. Compared to columnar storage, it is slow in batch read. In particular, when the system only needs to read small number of columns, the performance of docvalue can be about a dozen times faster than that of _source.

    Note

    1. The text type fields in ES do not have columnar storage, so if the field value that needs to be accessed is of text type, it will automatically downgrade to access from _source.
    2. In the case of excessive number of 'access' fields (> = 25), the performance of accessing field values from docvalue will be basically the same as accessing field values from _source.

    Sniff Keyword Field (enable_keyword_sniff=true)

    CREATE EXTERNAL TABLE `test` (
    `k1` bigint(20) COMMENT "",
    `k2` datetime COMMENT "",
    `k3` varchar(20) COMMENT "",
    `k4` varchar(100) COMMENT "",
    `k5` float COMMENT ""
    ) ENGINE=ELASTICSEARCH
    PROPERTIES (
    "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
    "index" = "test",
    "user" = "root",
    "password" = "root",
    "enable_keyword_sniff" = "true"
    );
    Parameter description:
    Parameter
    Description
    enable_keyword_sniff
    Whether to sniff string type tokenization type (text) fields in ES, and obtain extra non-tokenized (keyword) field name (multi-fields mechanism)
    ES allows direct data import without creating an index since it will automatically create a new index after import. For string fields, ES will create a field with both text and keyword types. This is how the multi-fields feature of ES works. The mapping is as follows:
    "k4": {
    "type": "text",
    "fields": {
    "keyword": {
    "type": "keyword",
    "ignore_above": 256
    }
    }
    }
    To conduct condition filtering on k4, the filtering operation will be converted to an ES TermQuery. SQL filter conditions:
    k4 = "Doris On ES"
    The converted ES query DSL is:
    "term" : {
    "k4": "Doris On ES"
    
    }
    Because the first field of k4 is text, it will be tokenized by the analyzer set for k4 (or by the standard analyzer if no analyzer has been set for k4) after data ingestion. As a result, it will be tokenized into three terms: Doris, On, and ES. The details are as follows:
    POST /_analyze
    {
    "analyzer": "standard",
    "text": "Doris On ES"
    }
    The result of segmentation is:
    {
    "tokens": [
    {
    "token": "doris",
    "start_offset": 0,
    "end_offset": 5,
    "type": "<ALPHANUM>",
    "position": 0
    },
    {
    "token": "on",
    "start_offset": 6,
    "end_offset": 8,
    "type": "<ALPHANUM>",
    "position": 1
    },
    {
    "token": "es",
    "start_offset": 9,
    "end_offset": 11,
    "type": "<ALPHANUM>",
    "position": 2
    }
    ]
    }
    What is used at the time of query is:
    "term" : {
    "k4": "Doris On ES"
    }
    Doris On ES this term does not match any term in the dictionary, will not return any results, while enabling enable_keyword_sniff: true will automatically convert k4 = "Doris On ES" to k4.keyword = "Doris On ES" to fully match SQL semantics, the converted ES query DSL is:
    "term" : {
    "k4.keyword": "Doris On ES"
    }
    k4.keyword is of type keyword, and when data is written into ES, it is a complete term, so it can match.

    Auto Node Discovery

    DSet to True by Default (nodes_discovery=true).
    CREATE EXTERNAL TABLE `test` (
    `k1` bigint(20) COMMENT "",
    `k2` datetime COMMENT "",
    `k3` varchar(20) COMMENT "",
    `k4` varchar(100) COMMENT "",
    `k5` float COMMENT ""
    ) ENGINE=ELASTICSEARCH
    PROPERTIES (
    "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
    "index" = "test",
    "user" = "root",
    "password" = "root",
    "nodes_discovery" = "true"
    );
    Parameter description:
    Parameter
    Description
    nodes_discovery
    Whether to start EST node discovery. The default value is true.
    When the configuration is true, Doris will discover all available data nodes (the allocated tablets) in ES. If Doris BE hasn't accessed the ES data node addresses, then set the configuration to false. ES cluster are deployed in private networks that are isolated from the public Internet, so users will need proxy access.

    Is HTTPS access pattern enabled for the ES cluster

    If enabled, it should be set totrue. The default value is false(http_ssl_enabled=true).
    CREATE EXTERNAL TABLE `test` (
    `k1` bigint(20) COMMENT "",
    `k2` datetime COMMENT "",
    `k3` varchar(20) COMMENT "",
    `k4` varchar(100) COMMENT "",
    `k5` float COMMENT ""
    ) ENGINE=ELASTICSEARCH
    PROPERTIES (
    "hosts" = "http://192.168.0.1:8200,http://192.168.0.2:8200",
    "index" = "test",
    "user" = "root",
    "password" = "root",
    "http_ssl_enabled" = "true"
    );
    Parameter description:
    Parameter
    Description
    http_ssl_enabled
    Is HTTPS access pattern enabled for the ES cluster
    A temporary solution is to implement a trusts all method in FE/BE. In the future, the real user configuration certificates will be used.

    Query Usage

    You can use the ES external tables in Doris the same way as using Doris internal tables, except that the Doris data models (Rollup, Pre-Aggregation, and Materialized Views) cannot be used.

    Basic query

    select * from es_table where k1 > 1000 and k3 ='term' or k4 like 'fu*z_'

    Extended esquery(field, QueryDSL)

    Theesquery(field, QueryDSL) function can be used to pushe queries that cannot be expressed in SQL, such as match_phrase, and geoshape, etc., to ES for filtering. The first parameter ofesqueryis used to associate withindex, while the second parameter is the JSON expression of basicQuery DSLin ES, enclosed by{}. Theroot keyin JSON is unique, which can be match_phrase, geo_shape, or bool, etc. A match_phrase query:
    select * from es_table where esquery(k4, ' {
    "match_phrase": {
    "k4": "doris on es"
    }
    }');
    A geo related query:
    select * from es_table where esquery(k4, ' {
    "geo_shape": {
    "location": {
    "shape": {
    "type": "envelope",
    "coordinates": [
    [
    13,
    53
    ],
    [
    14,
    52
    ]
    ]
    },
    "relation": "within"
    }
    }
    }');
    A bool query:
    select * from es_table where esquery(k4, ' {
    "bool": {
    "must": [
    {
    "terms": {
    "k1": [
    11,
    12
    ]
    }
    },
    {
    "terms": {
    "k2": [
    100
    ]
    }
    }
    ]
    }
    }');

    Principles

    +----------------------------------------------+
    | |
    | Doris +------------------+ |
    | | FE +--------------+-------+
    | | | Request Shard Location
    | +--+-------------+-+ | |
    | ^ ^ | |
    | | | | |
    | +-------------------+ +------------------+ | |
    | | | | | | | | |
    | | +----------+----+ | | +--+-----------+ | | |
    | | | BE | | | | BE | | | |
    | | +---------------+ | | +--------------+ | | |
    +----------------------------------------------+ |
    | | | | | | |
    | | | | | | |
    | HTTP SCROLL | | HTTP SCROLL | |
    +-----------+---------------------+------------+ |
    | | v | | v | | |
    | | +------+--------+ | | +------+-------+ | | |
    | | | | | | | | | | |
    | | | DataNode | | | | DataNode +<-----------+
    | | | | | | | | | | |
    | | | +<--------------------------------+
    | | +---------------+ | | |--------------| | | |
    | +-------------------+ +------------------+ | |
    | Same Physical Node | |
    | | |
    | +-----------------------+ | |
    | | | | |
    | | MasterNode +<-----------------+
    | ES | | |
    | +-----------------------+ |
    +----------------------------------------------+
    1. After creating the ES external table, the FE will send a request to the specified host in order to obtain information about the HTTP port and the index shard allocation. If the request fails, it will traverse the host list in sequence until it succeeds or completely fails.
    2. When querying, it will generate a query plan comcurrency for the corresponding BE node based on some node information obtained by the FE and the Metadata information of the index.
    3. Following the principle of proximity, the BE node sends request to the locally deployed ES node, and obtain data from _source or docvalue from each tablet of ES index concurrently by way of HTTP Scroll.
    4. After Doris finishes calculating the result, it returns to the user.

    Best Practice

    Recommended usage of time type fields

    In ES, the usage of time fields is very flexible, while in Doris On ES, improper type setting of time fields will result in th failure of filter condition pushdown. It is recommended to allow the highest level of format compatibility for time fields when creating an index:
    "dt": {
    "type": "date",
    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
    }
    When creating this field in Doris, it is recommended to set its type to date or datetime (or varchar). You can use the following SQL statements to push the filter conditions down to ES:
    select * from doe where k2 > '2020-06-21';
    
    select * from doe where k2 < '2020-06-21 12:00:00';
    
    select * from doe where k2 < 1593497011;
    
    select * from doe where k2 < now();
    
    select * from doe where k2 < date_format(now(), '%Y-%m-%d');
    Note that if the format of the time field in ES is not set, the default time field format is:
    strict_date_optional_time||epoch_millis
    Timestamps imported into ES needs to be converted into ms. ms is the internal processing format in ES. Otherwise, errors will occur in ES external tables.

    Obtain ES Metadata Field _id

    Each imported files, if not specified with an _id, will be given a globally unique _id, which is the primary key. Users can assign an _id with unique business meaning to the files during importing. To obtain such field values from Doris On ES, you can add a varchar-typed _id field when creating tables.
    CREATE EXTERNAL TABLE `doe` (
    `_id` varchar COMMENT "",
    `city` varchar COMMENT ""
    ) ENGINE=ELASTICSEARCH
    PROPERTIES (
    "hosts" = "http://127.0.0.1:8200",
    "user" = "root",
    "password" = "root",
    "index" = "doe"
    }
    Note
    The _id field only supports = and in filtering.
    The _id field must be of varchar type.

    FAQs

    1. Doris On ES has a version requirement for ES. ES major version is greater than 5. Data scanning method is different before ES 2.x and after ES 5.x. Currently, only ES 5.x and later versions are supported.
    2. Are X-Pack authenticated ES cluster supported? All ES clusters with HTTP Basic authentications are supported.
    3. Why do some queries require longer response time than those in ES? Yes, for _count queries, ES can directly read the metadata related to the number of the specified files instead of filtering the original data.
    4. Can Aggregation operations be pushed down? Currently, Doris On ES does not support pushdown of Aggregation operations such as sum, avg, and min/max, etc. In such operations, Doris obtains all files that met the specified conditions from ES and then conducts computing internally.
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support