[0, 1, 2, 3, 4, 5, 6, 7]
8 个分桶(Bucket),我们称这样一个序列为一个 BucketsSequence
。每个 Bucket 内会有一个或多个数据分片(Tablet)。当表为单分区表时,一个 Bucket 内仅有一个 Tablet。如果是多分区表,则会有多个。
为了使得 Table 能够有相同的数据分布,同一 CG 内的 Table 必须保证以下属性相同:DISTRIBUTED BY HASH(col1, col2, ...)
中指定的列。分桶列决定了一张表的数据通过哪些列的值进行 Hash 划分到不同的 Tablet 中。同一 CG 内的 Table 必须保证分桶列的类型和数量完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。[0, 1, 2, 3, 4, 5, 6, 7]
,BE 节点有 [A, B, C, D]
4个。则一个可能的数据分布如下:+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+| 0 | | 1 | | 2 | | 3 | | 4 | | 5 | | 6 | | 7 |+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+| A | | B | | C | | D | | A | | B | | C | | D || | | | | | | | | | | | | | | || B | | C | | D | | A | | B | | C | | D | | A || | | | | | | | | | | | | | | || C | | D | | A | | B | | C | | D | | A | | B |+---+ +---+ +---+ +---+ +---+ +---+ +---+ +---+
PROPERTIES
中指定属性 "colocate_with" = "group_name"
,表示这个表是一个 Colocation Join 表,并且归属于一个指定的 Colocation Group。
示例:CREATE TABLE tbl (k1 int, v1 int sum)DISTRIBUTED BY HASH(k1)BUCKETS 8PROPERTIES("colocate_with" = "group1");
dbId_groupName
,但用户只感知 groupName。DROP TABLE
命令删除后,会在回收站默认停留一天的时间后,再删除),该 Group 也会被自动删除。SHOW PROC '/colocation_group';+-------------+--------------+--------------+------------+----------------+----------+----------+| GroupId | GroupName | TableIds | BucketsNum | ReplicationNum | DistCols | IsStable |+-------------+--------------+--------------+------------+----------------+----------+----------+| 10005.10008 | 10005_group1 | 10007, 10040 | 10 | 3 | int(11) | true |+-------------+--------------+--------------+------------+----------------+----------+----------+
SHOW PROC '/colocation_group/10005.10008';+-------------+---------------------+| BucketIndex | BackendIds |+-------------+---------------------+| 0 | 10004, 10002, 10001 || 1 | 10003, 10002, 10004 || 2 | 10002, 10004, 10001 || 3 | 10003, 10002, 10004 || 4 | 10002, 10004, 10003 || 5 | 10003, 10002, 10001 || 6 | 10003, 10004, 10001 || 7 | 10003, 10004, 10002 |+-------------+---------------------+
ALTER TABLE tbl SET ("colocate_with" = "group2");
。ALTER TABLE tbl SET ("colocate_with" = "");
。ColocateTableBalancer.java
中的代码注释。disable_colocate_balance
来禁止自动均衡。然后在合适的时间打开即可。(具体参阅高级操作一节)。CREATE TABLE `tbl1` (`k1` date NOT NULL COMMENT "",`k2` int(11) NOT NULL COMMENT "",`v1` int(11) SUM NOT NULL COMMENT "") ENGINE=OLAPAGGREGATE KEY(`k1`, `k2`)PARTITION BY RANGE(`k1`)(PARTITION p1 VALUES LESS THAN ('2019-05-31'),PARTITION p2 VALUES LESS THAN ('2019-06-30'))DISTRIBUTED BY HASH(`k2`) BUCKETS 8PROPERTIES ("colocate_with" = "group1");
CREATE TABLE `tbl2` (`k1` datetime NOT NULL COMMENT "",`k2` int(11) NOT NULL COMMENT "",`v1` double SUM NOT NULL COMMENT "") ENGINE=OLAPAGGREGATE KEY(`k1`, `k2`)DISTRIBUTED BY HASH(`k2`) BUCKETS 8PROPERTIES ("colocate_with" = "group1");
DESC SELECT * FROM tbl1 INNER JOIN tbl2 ON (tbl1.k2 = tbl2.k2);+----------------------------------------------------+| Explain String |+----------------------------------------------------+| PLAN FRAGMENT 0 || OUTPUT EXPRS:`tbl1`.`k1` | || PARTITION: RANDOM || || RESULT SINK || || 2:HASH JOIN || | join op: INNER JOIN || | hash predicates: || | colocate: true || | `tbl1`.`k2` = `tbl2`.`k2` || | tuple ids: 0 1 || | || |----1:OlapScanNode || | TABLE: tbl2 || | PREAGGREGATION: OFF. Reason: null || | partitions=0/1 || | rollup: null || | buckets=0/0 || | cardinality=-1 || | avgRowSize=0.0 || | numNodes=0 || | tuple ids: 1 || | || 0:OlapScanNode || TABLE: tbl1 || PREAGGREGATION: OFF. Reason: No AggregateInfo || partitions=0/2 || rollup: null || buckets=0/0 || cardinality=-1 || avgRowSize=0.0 || numNodes=0 || tuple ids: 0 |+----------------------------------------------------+
colocate: true
。+----------------------------------------------------+| Explain String |+----------------------------------------------------+| PLAN FRAGMENT 0 || OUTPUT EXPRS:`tbl1`.`k1` | || PARTITION: RANDOM || || RESULT SINK || || 2:HASH JOIN || | join op: INNER JOIN (BROADCAST) || | hash predicates: || | colocate: false, reason: group is not stable || | `tbl1`.`k2` = `tbl2`.`k2` || | tuple ids: 0 1 || | || |----3:EXCHANGE || | tuple ids: 1 || | || 0:OlapScanNode || TABLE: tbl1 || PREAGGREGATION: OFF. Reason: No AggregateInfo || partitions=0/2 || rollup: null || buckets=0/0 || cardinality=-1 || avgRowSize=0.0 || numNodes=0 || tuple ids: 0 || || PLAN FRAGMENT 1 || OUTPUT EXPRS: || PARTITION: RANDOM || || STREAM DATA SINK || EXCHANGE ID: 03 || UNPARTITIONED || || 1:OlapScanNode || TABLE: tbl2 || PREAGGREGATION: OFF. Reason: null || partitions=0/1 || rollup: null || buckets=0/0 || cardinality=-1 || avgRowSize=0.0 || numNodes=0 || tuple ids: 1 |+----------------------------------------------------+
colocate: false, reason: group is not stable
。同时会有一个 EXCHANGE 节点生成。HELP ADMIN SHOW CONFIG;
和 HELP ADMIN SET CONFIG;
。disable_colocate_join = false
,则需设置 use_new_tablet_scheduler = false
,即关闭新的副本调度器。之后的版本中,use_new_tablet_scheduler
将均衡为 true。fe_host:fe_http_port
进行访问。需要 ADMIN 权限。GET /api/colocate返回以 Json 格式表示内部 Colocation 信息。{"msg": "success","code": 0,"data": {"infos": [["10003.12002", "10003_group1", "10037, 10043", "1", "1", "int(11)", "true"]],"unstableGroupIds": [],"allGroupIds": [{"dbId": 10003,"grpId": 12002}]},"count": 0}
POST /api/colocate/group_stable?db_id=10005&group_id=10008返回:200
DELETE /api/colocate/group_stable?db_id=10005&group_id=10008返回:200
POST /api/colocate/bucketseq?db_id=10005&group_id=10008Body:[[10004,10002],[10003,10002],[10002,10004],[10003,10002],[10002,10004],[10003,10002],[10003,10004],[10003,10004],[10003,10004],[10002,10004]]返回 200
disable_colocate_relocate
和 disable_colocate_balance
设为 true。即关闭系统自动的 Colocation 副本修复和均衡。否则可能在修改后,会被系统自动重置。select * from lineitem inner join orders on l_orderkey = o_orderkey where l_orderkey >500000000 limit 10;
CREATE TABLE lineitem ( l_shipdate date NOT NULL, l_orderkey bigint(20) NOT NULL, l_linenumber int(11) NOT NULL, l_partkey int(11) NOT NULL, l_suppkey int(11) NOT NULL, l_quantity decimalv3(15, 2) NOT NULL, l_extendedprice decimalv3(15, 2) NOT NULL, l_discount decimalv3(15, 2) NOT NULL, l_tax decimalv3(15, 2) NOT NULL, l_returnflag varchar(1) NOT NULL, l_linestatus varchar(1) NOT NULL, l_commitdate date NOT NULL, l_receiptdate date NOT NULL, l_shipinstruct varchar(25) NOT NULL, l_shipmode varchar(10) NOT NULL, l_comment varchar(44) NOT NULL ) ENGINE=OLAP DUPLICATE KEY(l_shipdate, l_orderkey) COMMENT 'OLAP' DISTRIBUTED BY HASH(l_orderkey) BUCKETS 96 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" );
CREATE TABLE orders ( o_orderkey bigint(20) NOT NULL, o_orderdate date NOT NULL, o_custkey int(11) NOT NULL, o_orderstatus varchar(1) NOT NULL, o_totalprice decimalv3(15, 2) NOT NULL, o_orderpriority varchar(15) NOT NULL, o_clerk varchar(15) NOT NULL, o_shippriority int(11) NOT NULL, o_comment varchar(79) NOT NULL ) ENGINE=OLAP DUPLICATE KEY(o_orderkey, o_orderdate) COMMENT 'OLAP' DISTRIBUTED BY HASH(o_orderkey) BUCKETS 96 PROPERTIES ( "replication_allocation" = "tag.location.default: 3", "in_memory" = "false", "storage_format" = "V2", "disable_auto_compaction" = "false" );
explain select * from lineitem inner join orders on l_orderkey = o_orderkey where l_orderkey >500000000 limit 10;
条件 | lineitem 表 | order 表 | 是否满足 colocation join 条件 |
分桶列 | l_orderkey(类型 bigint20) | o_orderkey(类型 bigint 20) | 满足 |
分桶数 | 96 | 96 | 满足 |
副本数 | 3 | 3 | 满足 |
ALTER TABLE lineitem SET ("colocate_with" = "tpch_group");ALTER TABLE orders SET ("colocate_with" = "tpch_group");
本页内容是否解决了您的问题?