Doris大表关联优化

记一次Doris大表关联优化

场景:

有两张大表,数据量分别为5kw+和接近7kw,表空间占用约等于6.26GB和7.77GB

![](https://raw.githubusercontent.com/YUTING0907/PicGo/main/img截屏2022-06-08 下午9.58.01.png)

历史数据上游提供的全部为同一天的数据,所以没办法进行分区,只能进行分桶,按照大概每个桶10w-15w数据量,将表分为300个bucket

表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
CREATE TABLE `ods_esc_buyer_goods_ebay_compatibility_model_test` (
`id` varchar(128) NOT NULL COMMENT "id",
`create_time` datetime NOT NULL COMMENT "create_time",
`category_id` varchar(200) NOT NULL COMMENT "ebayid",
`site` varchar(1020) NOT NULL COMMENT "站点",
`model_id` bigint NULL COMMENT "车型id",
`esc_category_id` varchar(128) NULL COMMENT "esc品类id",
`version` varchar(128) NULL COMMENT "版本号",
`update_type` int(11) NULL COMMENT "更新类型",
`update_time` datetime NULL COMMENT "update_time",
`deleted` tinyint(4) NULL DEFAULT "0" COMMENT "删除标记",
`is_deleted_flg` int(11) NULL DEFAULT "0" COMMENT "是否被系統物理刪 0:未被物理删,1:系统已删除",
`ts_ms` bigint(20) NULL DEFAULT "0" COMMENT "数据拉取时间",
INDEX idx_site (`site`) USING BITMAP COMMENT '站点索引',
INDEX idx_category_id (`category_id`) USING BITMAP COMMENT '品类索引',
INDEX idx_model_id (`model_id`) USING BITMAP COMMENT '车型id索引'
) ENGINE=OLAP
UNIQUE KEY(`id`, `create_time`, `category_id`, `site`)
COMMENT "品类车型关联表"
DISTRIBUTED BY HASH(`id`) BUCKETS 300
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);


CREATE TABLE ods.`ods_esc_buyer_goods_ebay_compatibility_properties_values_test` (
`id` varchar(128) NOT NULL COMMENT "id",
`create_time` datetime NOT NULL COMMENT "create_time",
`site` varchar(1020) NOT NULL COMMENT "站点",
`model_id` bigint NOT NULL COMMENT "车型id",
`level1` varchar(1020) NULL COMMENT "对应goods_ebay_compatibility_properties表propertyName,FitmentComments不是属性,遇到FitmentComments则删除FitmentComments并且level前移一位",
`level2` varchar(1020) NULL COMMENT "level2",
`level3` varchar(1020) NULL COMMENT "level3",
`level4` varchar(1020) NULL COMMENT "level4",
`level5` varchar(1020) NULL COMMENT "level5",
`level6` varchar(1020) NULL COMMENT "level6",
`level7` varchar(1020) NULL COMMENT "level7",
`level8` varchar(1020) NULL COMMENT "level8",
`deleted` tinyint(4) NULL COMMENT "状态",
`update_time` datetime NULL COMMENT "update_time",
`is_deleted_flg` int(11) NULL DEFAULT "0" COMMENT "是否被系統物理刪 0:未被物理删,1:系统已删除",
`ts_ms` bigint(20) NULL DEFAULT "0" COMMENT "数据拉取时间",
INDEX idx_site (`site`) USING BITMAP COMMENT '站点索引',
INDEX idx_model_id (`model_id`) USING BITMAP COMMENT '车型id索引',
INDEX idx_level1 (`level1`) USING BITMAP COMMENT 'level1索引',
INDEX idx_level2 (`level2`) USING BITMAP COMMENT 'level2索引',
INDEX idx_level3 (`level3`) USING BITMAP COMMENT 'level3索引',
INDEX idx_level4 (`level4`) USING BITMAP COMMENT 'level4索引',
INDEX idx_level5 (`level5`) USING BITMAP COMMENT 'level5索引',
INDEX idx_level6 (`level6`) USING BITMAP COMMENT 'level6索引',
INDEX idx_level7 (`level7`) USING BITMAP COMMENT 'level7索引',
INDEX idx_level8 (`level8`) USING BITMAP COMMENT 'level8索引'
) ENGINE=OLAP
UNIQUE KEY(`id`, `create_time`, `site`)
COMMENT "品类属性值"
DISTRIBUTED BY HASH(`id`) BUCKETS 300
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);

查询语句如下:

发现SET query_timeout = 10000; 发现查询仍旧会time out

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
select 
t1.model_id
,split_part(t1.version , '-', 1)
,t1.esc_category_id
,t1.category_id
,t1.site
,t3.catagory_name
,""
,t1.update_type
,t1.deleted
,t1.create_time
,t1.update_time
,t2.level1
,t2.level2
,t2.level3
,t2.level4
,t2.level5
,t2.level6
,t2.level7
,t2.level8
from ods.ods_esc_buyer_goods_ebay_compatibility_model t1
left join ods.ods_esc_buyer_goods_ebay_compatibility_properties_values t2
on t1.model_id = t2.model_id
and t2.is_deleted_flg=0
and t2.deleted = 0
left join ods.ods_esc_goods_prd_catagory t3
on t1.esc_category_id = t3.catagory_uid
and t3.is_deleted_flg = 0
where t1.is_deleted_flg=0
;

观察select查询语句发现关联键model_id, 没有用上前缀索引。可以用bukect shuffle join进行优化。


优化

bukect shuffle join原理

Doris支持的常规分布式Join方式包括了shuffle join 和broadcast join。这两种join都会导致不小的网络开销:

举个例子,当前存在A表与B表的Join查询,它的Join方式为HashJoin,不同Join类型的开销如下:

  • Broadcast Join: 如果根据数据分布,查询规划出A表有3个执行的HashJoinNode,那么需要将B表全量的发送到3个HashJoinNode,那么它的网络开销是3B,它的内存开销也是3B
  • Shuffle Join: Shuffle Join会将A,B两张表的数据根据哈希计算分散到集群的节点之中,所以它的网络开销为 A + B,内存开销为B

在FE之中保存了Doris每个表的数据分布信息,如果join语句命中了表的数据分布列,我们应该使用数据分布信息来减少join语句的网络与内存开销,这就是Bucket Shuffle Join的思路来源。

bukect shuffle join使用方式

  • 将session变量enable_bucket_*shuffle*_join设置为true

    则FE在进行查询规划时就会默认将能够转换为Bucket Shuffle Join的查询自动规划为Bucket Shuffle Join。

set enable_bucket_*shuffle*_join = true;

在FE进行分布式查询规划时,优先选择的顺序为 Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。但是如果用户显式hint了Join的类型,如:

1
select * from test join [shuffle] baseall on test.k1 = baseall.k1;
  • 可以通过explain命令来查看Join是否为Bucket Shuffle Join:
1
2
3
4
5
 2:HASH JOIN                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
| | join op: INNER JOIN (BUCKET_SHUFFLE) |
| | hash predicates: |
| | colocate: false, reason: table not in the same group |
| | equal join conjunct: `test`.`k1` = `baseall`.`k1`

在Join类型之中会指明使用的Join方式为:BUCKET_*SHUFFLE*

Bucket Shuffle Join的规划规则

  • Bucket Shuffle Join只生效于Join条件为等值的场景,原因与Colocate Join类似,它们都依赖hash来计算确定的数据分布
  • 在等值Join条件之中包含两张表的分桶列,当左表的分桶列为等值的Join条件时,它有很大概率会被规划为Bucket Shuffle Join
  • 由于不同的数据类型的hash值计算结果不同,所以Bucket Shuffle Join要求左表的分桶列的类型与右表等值join列的类型需要保持一致,否则无法进行对应的规划
  • 对于分区表,由于每一个分区的数据分布规则可能不同,所以Bucket Shuffle Join只能保证左表为单分区时生效。所以在SQL执行之中,需要尽量使用where条件使分区裁剪的策略能够生效。

基于以上规则,将关联键设置为bukect分桶列,并加入到key列,利用前缀索引加速查询。

tips:Join 条件中存在左表的分布式列(bukect列),且左表执行时为单分区,才可以让 Bucket Shuffle Join 生效。

优化后的表结构如下,优化完3s出结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
CREATE TABLE `ods_esc_buyer_goods_ebay_compatibility_model_test` (
`model_id` bigint NULL COMMENT "车型id",
`id` varchar(128) NOT NULL COMMENT "id",
`create_time` datetime NOT NULL COMMENT "create_time",
`category_id` varchar(200) NOT NULL COMMENT "ebayid",
`site` varchar(1020) NOT NULL COMMENT "站点",
`esc_category_id` varchar(128) NULL COMMENT "esc品类id",
`version` varchar(128) NULL COMMENT "版本号",
`update_type` int(11) NULL COMMENT "更新类型",
`update_time` datetime NULL COMMENT "update_time",
`deleted` tinyint(4) NULL DEFAULT "0" COMMENT "删除标记",
`is_deleted_flg` int(11) NULL DEFAULT "0" COMMENT "是否被系統物理刪 0:未被物理删,1:系统已删除",
`ts_ms` bigint(20) NULL DEFAULT "0" COMMENT "数据拉取时间",
INDEX idx_site (`site`) USING BITMAP COMMENT '站点索引',
INDEX idx_category_id (`category_id`) USING BITMAP COMMENT '品类索引',
INDEX idx_model_id (`model_id`) USING BITMAP COMMENT '车型id索引'
) ENGINE=OLAP
UNIQUE KEY(`model_id`,`id`, `create_time`, `category_id`, `site`)
COMMENT "品类车型关联表"
DISTRIBUTED BY HASH(`model_id`) BUCKETS 300
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);


CREATE TABLE ods.`ods_esc_buyer_goods_ebay_compatibility_properties_values_test` (
`model_id` bigint NOT NULL COMMENT "车型id",
`id` varchar(128) NOT NULL COMMENT "id",
`create_time` datetime NOT NULL COMMENT "create_time",
`site` varchar(1020) NOT NULL COMMENT "站点",
`level1` varchar(1020) NULL COMMENT "对应goods_ebay_compatibility_properties表propertyName,FitmentComments不是属性,遇到FitmentComments则删除FitmentComments并且level前移一位",
`level2` varchar(1020) NULL COMMENT "level2",
`level3` varchar(1020) NULL COMMENT "level3",
`level4` varchar(1020) NULL COMMENT "level4",
`level5` varchar(1020) NULL COMMENT "level5",
`level6` varchar(1020) NULL COMMENT "level6",
`level7` varchar(1020) NULL COMMENT "level7",
`level8` varchar(1020) NULL COMMENT "level8",
`deleted` tinyint(4) NULL COMMENT "状态",
`update_time` datetime NULL COMMENT "update_time",
`is_deleted_flg` int(11) NULL DEFAULT "0" COMMENT "是否被系統物理刪 0:未被物理删,1:系统已删除",
`ts_ms` bigint(20) NULL DEFAULT "0" COMMENT "数据拉取时间",
INDEX idx_site (`site`) USING BITMAP COMMENT '站点索引',
INDEX idx_model_id (`model_id`) USING BITMAP COMMENT '车型id索引',
INDEX idx_level1 (`level1`) USING BITMAP COMMENT 'level1索引',
INDEX idx_level2 (`level2`) USING BITMAP COMMENT 'level2索引',
INDEX idx_level3 (`level3`) USING BITMAP COMMENT 'level3索引',
INDEX idx_level4 (`level4`) USING BITMAP COMMENT 'level4索引',
INDEX idx_level5 (`level5`) USING BITMAP COMMENT 'level5索引',
INDEX idx_level6 (`level6`) USING BITMAP COMMENT 'level6索引',
INDEX idx_level7 (`level7`) USING BITMAP COMMENT 'level7索引',
INDEX idx_level8 (`level8`) USING BITMAP COMMENT 'level8索引'
) ENGINE=OLAP
UNIQUE KEY(`model_id`,`id`, `create_time`, `site`)
COMMENT "品类属性值"
DISTRIBUTED BY HASH(`model_id`) BUCKETS 300
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"in_memory" = "false",
"storage_format" = "V2"
);

SMB JOIN

Bucket Shuffle Join 其实就是SMB JOIN,大表join大表的场景都是这样解决的。

SMB JOIN是sort merge bucket操作,需要进行分桶,首先会进行排序,然后根据key值合并,把相同key的数据放到同一个bucket中(按照key进行hash)。

分桶的目的其实就是把大表化成“小表”(多个桶)。

相同key的数据都在同一个桶中之后,再进行join操作,那么在联合的时候就会大幅度的减小无关项的扫描。

使用条件:

(1)两表进行分桶,桶的个数必须相等

(2)两边进行join时,join列==排序列==分桶列


Tips: Doris参数优化

例如,

设置并发数 set parallel_fragment_exec_instance_num = 8;

调高执行内存 set exec_mem_limit = 8G;


觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭



wechat pay



alipay

Doris大表关联优化
http://yuting0907.github.io/2022/06/08/Doris大表关联优化/
作者
Echo Yu
发布于
2022年6月8日
许可协议