tencent cloud

Feedback

Bucket Shuffle Join

Last updated: 2024-06-27 11:11:41
    Bucket Shuffle Join aims to provide local optimization for some Join queries, to reduce the time for data transmission between nodes and speed up queries. See ISSUE 4394 for its design, implementation, and effects.

    Definitions

    FE: Frontend, the frontend node of Doris, responsible for Metadata management and request access.
    BE: Backend, the backend node of Doris, responsible for query execution and data storage.
    Left table: In a Join query, it's the table on the left. The probe operation is performed. The order can be adjusted by Join Reorder.
    Right table: In a Join query, it's the table on the right. The build operation is performed. The order can be adjusted by Join Reorder.

    Principles

    The commonly distributed Join methods supported by Doris include shuffle join and broadcast join. Both types of join can cause rather considerable network overhead: For example, there is a Join query between Table A and Table B, and its Join method is HashJoin. The overhead of different Join types is as follows:
    Broadcast Join: If determined by data distribution, and the query plans that Table A has 3 executing HashJoinNodes, then it is necessary to send the entire Table B to 3 HashJoinNodes, so its network overhead is 3B, and its memory overhead is also 3B.
    Shuffle Join: Shuffle Join will scatter the data of Tables A and B to cluster nodes based on the hash calculation, so its network overhead is A + B, and the memory overhead is B.
    The data distribution information of each Doris table is stored in FE. If the join statement hits the table data distribution column, we should use the data distribution information to reduce the network and memory overhead of the join statement. This is the idea behind Bucket Shuffle Join.
    
    The above image shows the working principle of Bucket Shuffle Join. The SQL statement is Table A join Table B, and the equal expression of join hits the data distribution column of Table A. Bucket Shuffle Join will send the data of Table B to the corresponding Data Storage calculating node of Table A according to the data distribution information of Table A. The cost of Bucket Shuffle Join is as follows:
    Network overhead: B < min(3B, A + B)
    Memory overhead: B <= min(3B, B)
    It can be seen that compared with Broadcast Join and Shuffle Join, Bucket Shuffle Join has obvious performance advantages. It reduces the data transmission time between nodes and reduces memory overhead during Join. In contrast with the original Join method of Doris, it has the following advantages:
    Bucket-Shuffle-Join reduces the network and memory overhead, and improves the performance of some Join queries. Especially when FE can perform partition clipping and bucket clipping for the left table.
    In addition, unlike Colocate Join, it is not intrusive to the data distribution of the table, which is transparent to users. There is no mandatory requirement for the data distribution of the table, which is not easy to lead to the problem of data skew.
    It can provide more optimization space for Join Reorder.

    Use Method

    Setting Session Variables

    Set session variable enable_bucket_shuffle_join to true, and FE will automatically plan queries that can be converted into Bucket Shuffle Join.
    set enable_bucket_shuffle_join = true;
    In FE's distributed query planning, the priority order is Colocate Join > Bucket Shuffle Join > Broadcast Join > Shuffle Join. However, if the user explicitly hints the type of Join, such as:
    select * from test join [shuffle] baseall on test.k1 = baseall.k1;
    the above order of priority will not take effect. This session variable is set to true by default in version 0.14, and it needs to be set to true manually in version 0.13.

    Check Join Type

    You can check whether Join is a Bucket Shuffle Join through the explain command:
    | 2:HASH JOIN |
    | | join op: INNER JOIN (BUCKET_SHUFFLE) |
    | | hash predicates: |
    | | colocate: false, reason: table not in the same group |
    | | equal join conjunct: test.k1 = baseall.k1
    The type of join indicates that the join method to be used is: BUCKET_SHUFFLE.

    Planning Rules of Bucket Shuffle Join

    In most scenarios, users only need to switch on the session variable by default to transparently use the performance improvement brought by this Join method. However, if we understand the planning rules of Bucket Shuffle Join, we can use it to write more efficient SQL.
    Bucket Shuffle Join only works when the Join condition is equivalent. The reason is similar to Colocate Join. They all depend on hash to calculate the determined data distribution.
    The bucket column of two tables is included in the equivalent Join condition. When the bucket column of the left table is an equivalent Join condition, it has a high probability of being planned as a Bucket Shuffle Join.
    Because the hash values of different data types are different, Bucket Shuffle Join requires that the type of the bucket column of the left table and the equivalent join column type of the right table should be consistent, otherwise the corresponding planning cannot be carried out.
    Bucket Shuffle Join only works on Doris native OLAP tables. For external tables such as ODBC, MySQL, ES, etc., they cannot be planned as Bucket Shuffle Join when they are used as the left table.
    For partition tables, because the data distribution rules of each partition may be different, Bucket Shuffle Join can only guarantee that the left table is a single partition. Therefore, in SQL execution, we need to use the where condition as far as possible to make partition clipping policy effective.
    If the left table is a Colocate table, the data distribution rules of each partition are determined, so the Bucket Shuffle Join can perform better on the Colocate table.

    Best Practice

    Judge whether the existing SQL meets the condition of Bucket Shuffle Join.
    Take sql3 of TPC-H test set as an example.
    select l_orderkey,
    sum(l_extendedprice * (1 - l_discount)) as revenue,
    o_orderdate, o_shippriorityfrom (
    select l_orderkey, l_extendedprice, l_discount, o_orderdate, o_shippriority, o_custkey
    from lineitem
    join orders
    where l_orderkey = o_orderkey
    and o_orderdate < date '1995-03-15'
    and l_shipdate > date '1995-03-15' ) t1
    join customer c on c.c_custkey = t1.o_custkey
    where c_mktsegment = 'BUILDING'
    group by l_orderkey, o_orderdate, o_shippriority
    order by revenue desc, o_orderdate
    limit 10;
    From sql3, it can be seen that the subquery uses join conditions are equal, if one condition is satisfied, other conditions, we can satisfy at the table creation stage.
    select l_orderkey, l_extendedprice, l_discount, o_orderdate, o_shippriority, o_custkey
    from lineitem
    join orderswhere l_orderkey = o_orderkeyand o_orderdate < date '1995-03-15'
    and l_shipdate > date '1995-03-15'

    Check that the environment variable is enabled

    MySQL [tpch_100_d]> show variables like "%enable_bucket_shuffle_join%";
    +----------------------------+-------+
    | Variable_name | Value |
    +----------------------------+-------+
    | enable_bucket_shuffle_join | true |
    +----------------------------+-------+
    1 row in set (0.00 sec)

    Create corresponding tables, keep the bucket columns of the two tables are consistent with their corresponding type

    
    CREATE TABLE lineitem (
    l_shipdate date NOT NULL,
    l_orderkey bigint(20) NOT NULL,
    l_linenumber int(11) NOT NULL,
    l_partkey int(11) NOT NULL,
    l_suppkey int(11) NOT NULL,
    l_quantity decimalv3(15, 2) NOT NULL,
    l_extendedprice decimalv3(15, 2) NOT NULL,
    l_discount decimalv3(15, 2) NOT NULL,
    l_tax decimalv3(15, 2) NOT NULL,
    l_returnflag varchar(1) NOT NULL,
    l_linestatus varchar(1) NOT NULL,
    l_commitdate date NOT NULL,
    l_receiptdate date NOT NULL,
    l_shipinstruct varchar(25) NOT NULL,
    l_shipmode varchar(10) NOT NULL,
    l_comment varchar(44) NOT NULL
    ) ENGINE=OLAP
    DUPLICATE KEY(l_shipdate, l_orderkey)COMMENT 'OLAP'
    DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 3",
    "in_memory" = "false","storage_format" = "V2",
    "disable_auto_compaction" = "false"
    );
    
    CREATE TABLE orders (
    o_orderkey bigint(20) NOT NULL,
    o_orderdate date NOT NULL,
    o_custkey int(11) NOT NULL,
    o_orderstatus varchar(1) NOT NULL,
    o_totalprice decimalv3(15, 2) NOT NULL,
    o_orderpriority varchar(15) NOT NULL,
    o_clerk varchar(15) NOT NULL,
    o_shippriority int(11) NOT NULL,
    o_comment varchar(79) NOT NULL
    ) ENGINE=OLAP
    DUPLICATE KEY(o_orderkey, o_orderdate) COMMENT 'OLAP'
    DISTRIBUTED BY HASH(o_orderkey) BUCKETS 96
    PROPERTIES (
    "replication_allocation" = "tag.location.default: 3",
    "in_memory" = "false","storage_format" = "V2",
    "disable_auto_compaction" = "false"
    );

    Analyze the business SQL through explain to ensure that Bucket Shuffle Join can take effect

    enter image description here
    
    Contact Us

    Contact our sales team or business advisors to help your business.

    Technical Support

    Open a ticket if you're looking for further assistance. Our Ticket is 7x24 avaliable.

    7x24 Phone Support