代码->DAG调度器逻辑任务->Task调度器任务分配和管理监控->Worker干活
RDD的运行会完全按照开发者的代码执行, 如果开发者水平有限,RDD的执行效率也会受到影响。
而SparkSQL会对写完的代码,执行“自动优化”, 以提升代码运行效率,避免开发者水平影响到代码执行效率。
思考:为什么SparkSQL可以自动优化,而RDD不可以?
RDD:内含数据类型不限格式和结构
DataFrame:100% 是二维表结构,可以被针对SparkSQL的自动优化,依赖于:Catalyst优化器
SparkSQL执行流程详解
为了解决过多依赖Hive的问题,SparkSQL使用了一个新的SQL优化器替代Hive中的优化器,这个优化器就是Catalyst,整个SparkSQL的架构大致如下:
1. API层简单的说就是Spark会通过一些API接受SQL语句
2. 收到SQL语句以后,将其交给Catalyst,Catalyst负责解析SQL,生成执行计划等
3. Catalyst的输出应该是RDD的执行计划
4. 最终交由集群运行
具体流程:
Step1:解析SQL,并且生成AST(抽象语法树)
Step2:在AST中加入元数据信息,做这一步主要是为了一些优化,例如 col = col 这样的条件,下图是一个简略图,便于理解
· score.id → id#1#L 为 score.id 生成 id 为1,类型是Long
· score.math_score → math_score#2#L 为 score.math_score 生成 id 为2,类型为Long
· people.id → id#3#L 为 people.id 生成 id 为3,类型为Long
· people.age →age#4#L 为people.age 生成 id 为4,类型为Long
Step3:对已经加入元数据的AST,输入优化器,进行优化,从两种常见的优化开始,简单介绍:
· 断言下推 Predicate Pushdown,将Filter 这种可以减小数据集的操作下推,放在Scan的位置,这样可以减少操作时候的数据量。
如这个代码,正常流程是先JOIN然后做WHERE,断言下推后,会先过滤age,然后再JOIN,减少JOIN的数据量,提高性能。
· 列值裁剪 Column Pruning,在断言下推后执行裁剪,由于people表之上的操作只用到了id列,所以可以把其他列裁剪掉,这样可以减少处理的数据量,从而优化处理速度。
如下图,在scan前又加入了Filter,作为列裁剪用
· 还有其余很多优化点,大概一共有一二百种,随着SparkSQL的发展,还会越来越多,感兴趣的同学可以继续通过源码了解,源码在org.apache.sql.catalyst.optimizer.Optimizer
Step4:上面的过程生成的 AST 其实最终还没办法直接运行,这个 AST 叫做逻辑计划,结束后,需要生成物理计划,从而生成 RDD 来运行。
在生成‘物理计划’的时候,会经过‘成本模型’对整棵树再次进行优化,选择一个更好的计划。
在生成‘物理计划’以后,因为考虑到性能,所以会使用代码生成,在机器中运行。
可以使用 queryExecution 方法查看逻辑执行计划,使用explain方法查看物理执行计划。
总结:catalyst的各种优化细节非常多,大方面的优化点有2个:
· 谓词下推(Predicate Pushdown) 断言下推:将逻辑判断提前到前面,以减少shuffle阶段的数据量。即:行过滤,提前执行where。
· 列值裁剪(Column Pruning):将加载的列进行裁剪,尽量减少被处理数据的宽度。即:列过滤,提前规划select的字段数量。
列值裁剪,有一种非常合适的存储系统:parquet。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
如需转载请保留出处:https://bianchenghao.cn/bian-cheng-ri-ji/16877.html