网站首页 > 开源技术 正文
Grouped Execution
为了方便大家理解 Grouped Execution 的原理,我们先来介绍两个概念:分桶 和 Hash Join。
1.1 分桶
- 其实 Hive 表中桶的概念就是 MapReduce 的分区的概念,两者完全相同。物理上每个桶就是目录里的一个文件,一个作业产生的桶(输出文件)数量和reduce任务个数相同。
- 而分区表的概念,则是新的概念。分区代表了数据的仓库,也就是文件夹目录。每个文件夹下面可以放不同的数据文件,通过文件夹可以查询里面存放的文件,但文件夹本身和数据的内容毫无关系。
- 桶则是按照数据内容的某个值进行分桶,把一个大文件散列称为一个个小文件。
1.2 Hash Join
主要分为两个阶段:建立阶段(build phase)和探测阶段(probe phase)
- Bulid Phase:选择一个表(一般情况下是较小的那个表,以减少建立哈希表的时间和空间),对其中每个元组上的连接属性(join attribute)采用哈希函数得到哈希值,从而建立一个哈希表。
- Probe Phase:对另一个表,扫描它的每一行并计算连接属性的哈希值,与bulid phase建立的哈希表对比,若有落在同一个 bucket 的,如果满足连接谓词(predicate)则连接成新的表。
1.3 Grouped Execution 原理
所谓的 Grouped Execution 针对的是那种数据摆放上进行了 bucketed 存储的数据查询的一种优化手段。
- 比如两个表 JOIN,在没有 Grouped Execution 的时候做 hash join,一般会把小表作为 build 表发到每个 worker 上,然后针对大表的数据做 probe,这样内存占用由整个小表数据量的大小决定
- 而如果这两个表是 bucketed 表,而且 bucket 的字段就是这个 JOIN 的字段,那么不同 bucket 之间的数据天然就 JOIN 不上,那么两个表之间的 JOIN 可以转变为相对应的 bucket 之间的 JOIN,最后再做个汇总即可,这样每个 worker 上的内存量会大大降低。
Recoverable Grouped Execution
单个 bucket 的 JOIN 如果失败是可以单独重试的,这也就引出了 Facebook 做的第二个优化:Lifespan 重试。通过 Lifespan 级别的重试可以提高大查询的成功率。
Exchange Materialization
上面两种优化手段的应用场景是很受限的,必须要 bucketed table,而且要用 bucketed 字段 JOIN 才能有用。确实,如果故事只到这里就结束就很没意思了,因此 Facebook 提出了第三个优化手段: Exchange Materialization。
我们知道 Presto 的 Exchange 本来是流式的,上游把数据通过 HTTP 发给下游,下游如果处理不过来会反压上游,中间是没有数据落盘的,如下图所示:
Exchange Materialization 则是要把数据落到盘上,并且按照 JOIN 的 key 组织成 bucketed table,那么从这个 Exchange 节点开始往后就可以应用上面的优化了,如下图所示:
Exchange Materialization 启用参数:
SET SESSION exchange_materialization_strategy='ALL';
SET SESSION partitioning_provider_catalog='hive';
SET SESSION hash_partition_count = 4096;
Spill to Disk
默认情况下,如果查询执行所请求的内存超过会话属性 query_max_memory 或 query_max_memory_per_node,Presto 就会终止查询。这种机制确保了查询分配内存的公平性,防止了内存分配导致的死锁。当集群中有很多小查询时,它是有效的,但会导致杀死不符合限制的大型查询。
4.1 原理简介
为了克服这种低效率,Presto 引入了可撤销内存的概念。Query 可以请求不计入限制的内存,但内存管理器可以在任何时候撤销这些内存。当内存被撤销时,查询运行程序将中间数据从内存溢出到磁盘,然后继续处理它。
在实践中,当集群空闲且所有内存都可用时,内存密集型查询可能会使用集群中的所有内存。另一方面,当集群没有太多空闲内存时,同一个查询可能被迫使用磁盘作为中间数据的存储。与完全在内存中运行的查询相比,强制溢出到磁盘的查询的执行时间可能要长几个数量级。
请注意,启用 spill-to-disk 并不能保证能够执行所有内存密集型的查询。查询运行程序仍然有可能无法将中间数据划分为足够小的块,使每个块都能装入内存,从而导致从磁盘加载数据时出现内存不足错误。
4.2 支持的操作
1、Joins
- 当任务并发大于 1 时,将对 build 表进行分区,分区的数量等于 task.concurrency 配置值
- 在对 build 表进行分区时,spill-to-disk 机制可以减少连接操作所需的峰值内存使用。当查询接近内存限制时,build 表分区的一个子集将溢出到磁盘,join 另一侧表中的记录也会落到相同的分区。
- 然后,就可以 one-by-one 读取溢出的分区数据以完成连接操作。
- 需要注意的是 join_spill_enabled 默认值是 false,可通过 set session join_spill_enabled=true; 启用
2、Aggregations
- 如果正在聚合的 group 中数量很大,则可能需要大量的内存。
- 当启用 spill-to-disk 时,如果没有足够的内存,则将中间累积聚合结果写入磁盘。
- 当内存可用时,它们被加载回来进行合并。
- 开启 spill_enabled 参数后无需单独设置其他参数
Dynamic Filtering
早在2005年,Oracle 数据库就支持比较丰富的 dynamic filtering 功能,而 Spark 和 Presto 在最近版本才开始支持这个功能。为了让大家快速理解这个功能,我们先来看一个 Spark 的栗子:
5.1 Spark 动态分区裁减
SELECT * FROM fact_iteblog
JOIN dim_iteblog
ON (dim_iteblog.partcol = fact_iteblog.partcol)
WHERE dim_iteblog.othercol > 10
通过 Spark 的动态分区裁减,可以将执行计划修改成如下形式:
可见,在扫描 fact_iteblog 表时,如果能自动加上了类似于 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 的过滤条件,那么当 fact_iteblog 表有大量的分区,而 select partcol from dim_iteblog WHERE dim_iteblog.othercol > 10 查询语句只返回少量的分区,这样可以大大提升查询性能。
5.2 Presto 动态过滤
Presto 也支持上面的功能,这个功能称为动态过滤(Dynamic Filtering)。事实上,Presto 的动态过滤比 Spark 的动态分区裁减要丰富。因为 Spark 动态分区裁减只能用于分区表,而 Presto 的动态过滤支持分区表和非分区表,Presto 的动态分区包含 Partition Pruning(分区表) 以及 Row filtering(非分区表)。直到 Presto 0.241,这个功能正式加入到 master 分支。
注意事项:
- 目前 PrestoDB 的实现中只有 Hive 数据源支持动态过滤
- 而且非分区表动态过滤只支持 ORC 数据格式
- 另外,Presto 的 dynamic filtering 和 grouped_execution 不能同时使用
- 并且需要设置以下参数
set session enable_dynamic_filtering=true;
set session hive.pushdown_filter_enabled=true;
Alluxio Cache Service
提高 Presto 查询延迟的一个常见优化是缓存工作集,以避免来自远程数据源或通过慢速网络的不必要的 I/O。Presto 利用 Alluxio 作为缓存层,主要有两种使用方式:Alluxio File System 和 Alluxio Structured Data Service。
注意:目前我们公司 Presto 集群尚未加入 Alluxio 缓存服务,这个优化手段可以作为一个知识储备。
6.1 Alluxio File System
Alluxio File System 将 Presto Hive Connector 作为一个独立的分布式缓存文件系统服务于 HDFS 或 AWS S3、GCP、Azure blob store 等对象存储之上。用户可以通过文件系统接口明确地了解缓存的使用情况并控制缓存。例如,可以预加载 Alluxio 目录中的所有文件,为 Presto 查询预热缓存,并设置缓存数据的 TTL(生存时间),以回收缓存容量。举个栗子:
注意对比 Hive 表的 location,前缀换成了 “alluxio://”
6.2 Alluxio Structured Data Service
Alluxio Structured Data Service 通过基于 Alluxio File System 的目录和缓存文件系统与 Presto 进行交互。这种方式有额外的优势,在不用修改 Hive 表的 location 的前提下,就能无缝访问现有的 Hive 表,并通过合并小文件或转换输入文件的格式进一步性能优化。具体做法如下:
猜你喜欢
- 2024-11-21 大数据平台架构及主流技术栈
- 2024-11-21 Apache Pinot vs. Apache Druid
- 2024-11-21 「大数据」SparkSql连接查询中的谓词下推处理(一)
- 2024-11-21 完美避坑!记一次Elasticsearch集群迁移架构实战
- 2024-11-21 大数据Presto(二):Presto安装搭建
- 2024-11-21 这个用Python编写的大数据测试工具,我给100分
- 2024-11-21 Presto 常用性能优化技巧
- 2024-11-21 大数据Presto(一):Presto介绍
- 2024-07-26 Presto 与 Hive 简单对比(presto和hive语法的区别)
- 2024-07-26 大数据平台建设之SQL查询引擎Presto
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- jdk (81)
- putty (66)
- rufus (78)
- 内网穿透 (89)
- okhttp (70)
- powertoys (74)
- windowsterminal (81)
- netcat (65)
- ghostscript (65)
- veracrypt (65)
- asp.netcore (70)
- wrk (67)
- aspose.words (80)
- itk (80)
- ajaxfileupload.js (66)
- sqlhelper (67)
- express.js (67)
- phpmailer (67)
- xjar (70)
- redisclient (78)
- wakeonlan (66)
- tinygo (85)
- startbbs (72)
- webftp (82)
- vsvim (79)
本文暂时没有评论,来添加一个吧(●'◡'●)