深入理解Presto(2) :Presto查询执行过程

深入理解Presto(2) :Presto查询执行过程

Presto查询执行过程

这里写图片描述

Presto包含三类角色,coordinator,discovery,worker。coordinator负责query的解析和调度。discovery负责集群的心跳和角色管理。worker负责执行计算。

presto-cli提交的查询,实际上是一个http POST请求。查询请求发送到coordinator后,经过词法解析和语法解析,生成抽象语法树,描述查询的执行。

执行计划编译器,会根据抽象语法树,层层展开,把语法树所表示的结构,转化成由单个操作所组成的树状的执行结构,称为逻辑执行计划。

原始的逻辑执行计划,直接表示用户所期望的操作,未必是性能最优的,在经过一系列性能优化和转写,以及分布式处理后,形成最终的逻辑执行计划。这时的逻辑执行计划,已经包含了map-reduce操作,以及跨机器传输中间计算结果操作。

scheduler从数据的meta上获取数据的分布,构造split,配合逻辑执行计划,把对应的执行计划调度到对应的worker上。

在worker上,逻辑执行计划生成物理执行计划,根据逻辑执行计划,会生成执行的字节码,以及operator列表。operator交由执行驱动来完成计算。

抽象语法树

由语法解析器根据SQL,解析生成的树状结构,描述SQL的执行过程。 在下文中,以SQLselect avg(response_size) as a , client_address from localfile.logs.http_request_log group by client_address order by a desc limit 10为例来描述。

抽象语法树数以Query为单位来描述查询,分层次表示不同层的子查询。每一层查询查询包含了几个关键因素:select, from,where,group by,having,order by,limit。其中,from可以是一个子查询,也可以是一张表。

一个典型的抽象语法树:

这里写图片描述

生成逻辑执行计划

抽象语法树树,描述的最原始的用户需求。抽象语法树描述的信息,执行效率上不是最优,执行操作也过于复杂。需要把抽象语法树转化成执行计划。执行计划分成两类,一类是逻辑执行计划,一类是物理执行计划。逻辑执行计划,以树状结构来描述执行,每个节点是最简单的操作。物理执行计划,根据逻辑执行计划生成字节码,交由驱动执行。

转写成逻辑执行计划的过程,包括转写和优化。把抽象语法树转写成由简单操作组成的结点树,然后把树中所有聚合计算节点转写成map-reduce形式。并且在map-reduce节点中间插入Exchange节点。然后,进行一系列优化,把一些能提前加速计算的节点下推,能合并的节点合并。

最后逻辑执行计划按照Exchange节点做划分,分成不同的段(fragament),表示不同阶段的的执行计划。在调度时,按照fragment调度。

SQLselect avg(response_size) as a , client_address from localfile.logs.http_request_log group by client_address order by a desc limit 10的逻辑执行计划:
这里写图片描述

从执行计划中可以看到,agg节点都是分成partial和final两步。

调度执行计划到机器上

调度涉及到两个问题,第一,某个fragment分配由哪些机器执行;第二,某个fragment的计算结果如何输出到下游fragment。

在调度时,需要为每一个fragment指定分配到哪些机器上。从调度上划分,fragment分三种类型

  • 一类是source类型由原始数据的存储位置决定fragment调度机器,有多少个source节点呢?connector会根据数据的meta,决定需要读取多少个split(分片) ,对于每一个source节点,分配一个split到一台机器上,如果在配置中指定了network-topology=flat,则尽量选择split所在的机器。
  • 一类是FIXED类型,主要用于纯计算节点,从集群中选择一台或多台机器分配给某个fragment。一般只有最终输出节点分配一个机器,中间的计算结果都要分配多台机器。分配的机器数由配置hash_partition_count决定。选择机器的方式是随机选择。
  • 一类是SINGLE类型,只有一台机器,主要用于汇总结果,随机选择一台机器。

对于计算结果输出,根据下游节点的机器个数,也有多种方式,

  • 如果下游节点有多台机器,例如group by的中间结果,会按照group by的key计算hash,按照hash值选择一个下游机器输出。对于非group by的计算,会随机选择或者round robin。
  • 如果下游节点只有一台机器,会输出到这台机器上。

以下图为例,fragment 2是source类型fragment,有三个split,所以分配了三台机器。因为这一层计算是group by 聚合计算,所以输出时按照group by的key计算hash,选择下游的某台机器输出。

这里写图片描述

调度之前的任务,都在coordinator完成,调度完成后,之后任务发送到worker上执行。

生成物理执行计划

逻辑执行计划fragment发送到机器上后,由结点树形式转写成operator list,根据逻辑代码动态编译生成字节码。动态生成字节码,主要是利用编译原理:

  • 展开循环
  • 根据数据列的类型,直接调用对用的函数,以减少分支跳转语句。

这些手段会更好的利用CPU的流水线。

执行驱动

物理执行计划构造生成的Operator list,交给Driver执行。具体计算哪些数据,由加载的Split决定。

Operator list 以串联形式处理数据,前一个operator的结果作为下一个结果的输入,对于source类型的operator,每一次调用都会获取一份新的数据;对于Aggregate的operator,只有之前所有的operator都finish之后,才能获取输出结果。

这里写图片描述

聚合计算

生成的执行计划中,聚合计算都拆分成了两步,分别是Map、Reduce。

聚合计算的Operator有两类,分别是AggregationOperator和HashAggregationOperator。

AggregationOperator对所有行进行计算,并且把结果更新到一个位置。HashAggregationOperator使用某一列的hash值作为hash表的key,key相同的行才会把结果保存在一起,用于group by类的计算。

聚合计算都是要按照Map-Reduce的形式执行。

聚合计算所提供的函数,都要提供四个接口,分别有两个输入,两个输出:

  1. 接受原始数据的输入
  2. 接受中间结果的输入
  3. 输出中间结果
  4. 输出最终结果。

1+3 构成了Map操作 2+4构成了Reduce操作。

以Avg为例:

  1. Map阶段输入1,2,3,4
  2. Map截断输出10,4 分别代表Sum和Count
  3. Reduce输入10,4
  4. Reduce输出最终平均值2.5

我们改造了Presto系统,使得Presto能够提供缓存功能,就是在MapReduce中间加了一层计算,接受中间结果输入和中间结果输出。

函数

函数分为两类,分别是Scaler函数和Aggregate函数

Scaler函数提供数据的转换处理,不保存状态,一个输入产生一个输出。

Aggregate函数提供数据的聚合处理,利用已有状态+输入,产生新的状态。

上文已经提到,聚合函数提供了两种输入接口和两种输出接口。