深入理解Presto,Presto的内部架构

深入理解Presto

本文译自英文书籍<Presto: the definitive guide>Presto权威指南第四章,目前该书的中文翻译版尚未出版,本文摘出书中对Presto内部介绍比较深入的第四章,看过本文对全书感兴趣的同学,请购买英文原版,或等待中文翻译版出版。–2020/08/16

在简单了解过Presto和众多的使用场景、并且安装和开始使用她之后,你现在已经准备好了深入探索更多。

在本书的第二部分,你会了解到Presto内部的工作机制,并做好准备在生产环境去安装、使用、运行、调优等等。

我们讨论了关于连接数据源的更多细节,并且使用Presto的SQL语句、运算符、函数等来查询这些数据。

第四章 Presto架构

前边的章节,我们简单介绍了Presto,初步安装和使用了Presto。现在我们开始讨论Presto的架构。我们深入了解相关的概念,以使你能够了解更多Presto的查询执行模型、查询方案规划、基于代价的优化器。

在本章节中,我们首先讨论Presto高层次的架构组件。这很重要,因为会帮助我们大体的理解Presto的工作方式,尤其你准备自己安装和维护Presto的集群(这会在第五章介绍)。

在本章后边的部分,我们探讨查询执行模型时,会更加深入了解那些组件。这很重要,假如你需要诊断和调优慢查询(这会在第八章介绍),或者你准备向Presto开源项目贡献代码。

Coordinator和Worker两种角色

当你第一次安装Presto时,你只用了一台机器来运行所有的查询。但是,单机环境是远远达不到理想的规模和性能的。

Presto是一个分布式的SQL查询引擎,组装了多个并行计算的数据库和查询引擎(这就是MPP模型的定义)。Presto不是依赖单机环境的垂直扩展性。她有能力在水平方向,把所有的处理分布到集群内的各个机器上。这意味着你可以通过添加更多节点来获得更大的处理能力。

利用这种架构,Presto查询引擎能够并行的在集群的各个机器上,处理大规模数据的SQL查询。Presto在每个节点上都是单进程的服务。多个节点都运行Presto,相互之间通过配置相互协作,组成了一个完整的Presto集群。

图4-1 展示了从宏观层面概括了Presto的集群组件:1个coordinator,多个worker节点。用户通过客户端连接到coordinator,可以短可以是JDBC驱动或者Presto命令行cli。

image.png

Coordinator是Presto上一个专门的服务,专门用来处理用户的查询请求,管理worker节点以执行查询。

Worker 节点则负责执行任务和处理数据。

Discovery服务通常跑在coordinator节点上,允许worker注册到集群信息中。

客户端、coordinator,worker之间的所有通信,都是用基于REST的HTTP/HTTPS交互。

图4-2展示了集群内coordinator和worker之间,以及worker和worker之间的通信。coordinator向多个worker通信,用于分配任务,更新状态,获得最终的结果返回用户。worker之间相互通信,向任务的上游节点获取数据。所有的worker都会向数据源读取数据。

image.png

Coordinator

Coordinator的作用是:

  • 从用户获得SQL语句。
  • 解析SQL语句。
  • 规划查询的执行计划。
  • 管理worker节点状态。

Coordinator是Presto集群的大脑,并且是负责和客户端沟通。用户通过PrestoCLI、JDBC、ODBC驱动、其他语言工具库等工具和coordinator进行交互。Coordinator从客户端接受SQL语句,例如select语句,才能进行计算。

每个Presto集群必须有一个coordinator,可以有一个或多个worker。在开发和测试环境中,一个Presto进程可以同时配置成两种角色。

Coordinator追踪每个worker上的活动,并且协调查询的执行过程。Coordinator给查询创建了一个包含多阶段的逻辑模型,

图4-3展示了客户端、coordinator,worker之间的通信。

一旦接受了SQL语句,Coordinator就负责解析、分析、规划、调度查询在多个worker节点上的执行过程,语句被翻译成一系列的任务,跑在多个worker节点上。worker一边处理数据,结果会被coordinator拿走并且放到output缓存区上,暴露给客户端。一旦输出缓冲区被客户完全读取,coordinator会代表客户端向worker读取更多数据。worker节点,和数据源打交道,从数据源获取数据。因此,客户端源源不断的读取数据,数据源源源不断的提供数据,直到查询执行结束。

Coordinator通过基于HTTP的协议和worker、客户端之间进行通信。

image.png

图4-3 客户端,coordinator,worker通信,处理SQL语句

Discovery Service(发现服务)

Presto使用Discovery服务来发现集群内的所有节点。每个Presto实例在启动时就向Discovery服务注册,之后,周期性的发送心跳信号。这让coordinator获得一个实时的可用节点列表,并且使用这个列表来调度查询执行过程。

如果一个worker停止发送心跳信号,discovery服务触发宕机检测,这一个worker就不会调度新的任务了。

为了简化部署,避免运行额外的服务,Presto coordinator通常运行一个内嵌的discovery服务版本。
这个版本和Presto共享HTTP服务,因此使用同一个端口就足够。

Worker上关于discovery 服务的配置,通常指向coordinator的hostname和端口。

Workers

Presto的worker是Presto集群中的一个服务。它负责运行coordinator指派给它的任务,并处理数据。worker节点通过连接器(connector)向数据源获取数据,并且相互之间可以交换数据。最终结果会传递给coordinator。 coordinator负责从worker获取最终结果,并传递给客户端。

在安装过程中,要把discovery的主机名或IP地址告诉worker。worker启动后,会向discovery广播自己,之后才能在查询中调度任务。

Workers communicate with other workers and the coordinator by using an HTTP- based protocol.

Worker之间的通信、worker和coordinator之间的通信采用基于HTTP的协议。

图4-4展示了多个worker如何从数据源获取数据,并且合作处理数据的流程。直到某一个worker把数据提供给了coordinator。

image.png

图4-4 一个集群内的worker相互合作处理SQL和数据

基于Connector的架构

Presto计算和存储分离的架构核心,就是基于Connector(连接器)的架构。一个Connector提供了Presto访问任意一个数据源的接口。
每个Connector为底层的数据源提供了一层基于表的抽象接口。只要数据能够用表、列、行等形式,用P热宋体支持的数据类型,那么就可以创建出一个连接器,并通过这个连接器处理数据。
Presto提供了一套SPI接口,实现Connector就是要实现这些API。通过实现SPI接口,Presto就可以通过标准的操作连接任何数据源,并且在数据源上提供任何操作。这个连接器负责和数据源交互的细节。

每个Connector需要实现三种类型的API:

  • 读取表/view/schema的metadata的操作。
  • 提供逻辑上的数据分区,以方便Presto并行读和写。之所以说是逻辑上的,是因为Presto是计算存储分离的架构,Presto并不实际关心数据的实际存储位置。
  • 读取的数据源和持久化数据的sink,把元数据从源格式转化成引擎期望的内存格式,或者把引擎的内存格式转化成存储的格式。

Presto提供了很多系统的连接器,例如HDFS/Hive, MySQL, Post‐ greSQL, MS SQL Server, Kafka, Cassandra, Redis等等。在第6章和第七章你会了解到多个连接器,会有越来越多的连接器出现。请查看Presto的文档,了解最新的连接器。

另外,Presto 的SPI接口为你提供了创建自定义连接器的能力。如果你需要访问的数据源,没有可用的兼容连接器,那么就需要创建自定义的链接起了。在这种场景下,我们强烈建议你了解更多Presto开源社区,获得我们的帮助,并且向社区贡献你的连接器。如果你在你的组织内有特殊的数据源,那么也需要一个自定义的连接器。这种方式,让Presto用户使用SQL来读取任意数据,实现真正的SQL-on-Anything。

image.png

图4-5 展示了Presto SPI包含的coordinator使用的metadata数据源,数据统计,数据位置信息,以及worker使用的数据流API。

Presto连接器是插件,每个server启动时加载这些插件。通过在catalog属性文件中配置特定参数,会从plugin目录加载。我们会在第六章探索更多相关信息。

Presto通过基于插件的架构,支持多种功能,除了连接器,插件会提供监听器、访问控制、函数和类型。

Catalogs(目录), Schemas(模式)和table(表)

Catalog、schema,table是Presto提供的三级数据管理结构。catalog对应连接器,schema对应一个数据源中的数据库,table对应数据库中的表。

Presto 集群使用基于连接器的架构来处理所有的查询。每个catalog配置使用一个连接器访问一个特定的数据源。数据源再catalog中暴露一个或者多个schema。每个schema包含若干个table(表),表中的每一行就是多列数据,每一列是不同的类型。在第八章会又更多关于catalog,schema,table的介绍。

Query Execution Model
Now that you understand how any real-world deployment of Presto involves a cluster with a coordinator and many workers, we can look at how an actual SQL query state‐ ment is processed.

查询执行模型

现在开始了解一下Presto现实世界中的开发。我们可以看一下一个实际的SQL语句是如何处理的。
理解执行模型会帮助你了解能够调优Presto性能的必要的基础知识。回忆一下,coordinator从终端用户那里接收SQL语句,使用方式包括使用ODBC/JDBC驱动的客户端软件。coordinator触发所有的worker去从数据源中读取数据,生成结果集合,并提供给用户。

首先我们深入一步探索一下coordinator内部发生了什么事情。当一个SQL语句提交给coordinator的时候,是普通的文本模式。coordinator接收文本,解析并且分析这段文本。然后它创建出一个执行计划。这个工作有是图4-6所描述的。查询执行计划从通常意义上概括了处理数据和返回结果的步骤。

image.png

图4-6 处理SQL语句,创建出一个查询计划

如图4-7所示,生成查询计划的过程,使用了metadata的SPI接口、数据的统计SPI接口。coordinator使用SPI来收集关于表的信息、链接数据源的信息等等。

image.png

图4-7 服务提供商接口用于query的规划和调度。

coordinator使用metadata SPI接口来获取表、列、类型的信息。这些信息用于校验Query的语义是否正确,执行语句的类型检查、安全检查。

统计SPI接口用于获取关于行数的信息、表大小的信息。这些信息可以在生成执行计划的阶段用来做基于代价的查询优化。

数据位置的SPI接口在创建分布式执行计划的过程中被用到。这些信息用来生成表的逻辑上的split。split是并行执行时最小的调度单元。

分布式的执行计划扩展了简单查询计划。简单查询计划包含了一个或者多个stage(阶段)。 简单查询计划会分割成计划段。一个stage描述的就是一个计划段(fragment)在执行时刻,包含了stage的计划段中所有的任务。

coordinator分割开整个plan,从而有效的利用集群的并行度加速整体查询的速度。由于有多个stage,导致会出现一个依赖树。stage的个数取决于查询的复杂度。例如,查询的表、返回的列。join语句,where条件,group by运算,和其他的SQL语句,都会影响stage的个数。

图4-8描述了逻辑执行计划是如何转换成分布式查询计划的。

image.png

图4-8 查询计划转化成分布式查询计划

分布式查询计划定义了定义了stage,以及在Presto集群中执行query的方法。coordinator用它来做进一步的规划、调度task到worker节点上。 一个stage包含了一个或多个task。典型的,会引入很多task,每个task处理一小片数据。

如图4-9所示,coordinator指定task到worker节点上。

image.png

图4-9 coordinator所做的任务管理。

数据处理的单元被称为split。split描述从底层读取和处理的一段数据。split是并行计算的最小单元,是任务分配的单元。连接器能够做的操作依赖于底层的数据源。

例如,hive连接器用以下信息描述split:文件的路径、所取数据的offset、length。

数据源阶段的task以page(页)这种格式生成数据。page是以列式格式存储的多行数据集合。page数据通过stage的依赖关系,流向下游的stage。 不同的stage之间通过exchange运算交换数据,这种运算符从上游的stage中读取数据。

source task使用数据源的SPI来从底层的数据源中获取数据,数据以page的形式流向Presto,流过整个引擎。各个运算符根据自己的语义处理和产生数据。例如filter运算符丢掉一些数据,project运算发产生新的列数据。task内部的运算符之间的顺序成为流水线。流水线上最后一个运算符把他的输出放到ouptut缓存区。下游的exchange 运算符从这个缓冲区获取数据。不同worker之间的运算符是并行执行的。

image.png

图4-10 不同的split中的数据在task之间交换,在不同的worker上处理。

所以,task就是计划段(plan fragment)分配给worker运行时的称呼。当一个task创建出来后,它为每一个split初始化一个driver。每个driver初始化一个流水线的operator,然后处理一个split的数据。一个task可能会使用多个driver,取决于Presto的配置。一旦所有的driver都完成了,数据被传递到了下一层split,driver和任务结束了他们的工作,之后被销毁。

image.png

图4-11, 一个task中并行的driver,处理输入和输出的split

一个运算符处理输入的数据,并且向下游的运算符输出数据。常见的运算符包括表扫描运算符,过滤,join,聚合计算。一系列运算符构成operator的流水线。举个例子,一个流水线包含了一个扫描运算符读取数据,然后过滤数据,最后执行局部聚合。

为了处理一个查询,coordinator通过连接器的metadata创建了一组split。通过这一组split,coordinator开始调度task到机器上来采集数据。在查询执行阶段。coordinator追踪所有的可用split和他们的位置信息。一个task处理完成数据后,会产生更多的下游split。coordinator持续的调度任务,直到没有split需要处理。

一旦所有的split处理完成,所有的数据都可用,coordinator会把数据结果提供给客户端。

查询计划

在深入了解Presto逻辑执行计划生成器和基于代价的优化器如何工作之前,我们先限定一下我们考虑范围。这里提供了一个样例,来帮助我们探索逻辑执行计划的生成过程。

1
2
3
4
5
6
7
8
样例 4-1. Example query to explain query planning
SELECT
(SELECT name FROM region r WHERE regionkey = n.regionkey) AS region_name, n.name AS nation_name,
sum(totalprice) orders_sum
FROM nation n, orders o, customer c WHERE n.nationkey = c.nationkey
AND c.custkey = o.custkey
GROUP BY n.nationkey, regionkey, n.name ORDER BY orders_sum DESC
LIMIT 5;

先了解一下这个SQL的目的。

  • SELECT使用了三个表,隐式定义了一个CROSS join,跨越了三张表格。
  • WHERE条件获得满足条件的行。
  • group by聚合计算,group by的key是regionKey,nationKey。
  • 一个子查询,SELECT name FROM region WHERE regionkey = n.regionkey向region表中读取region的名称。请注意这个查询是相关联的。
  • 一个oder by语句,按照orders_sum 倒序排序。
  • limit限制5行,表示返回oders_sum最多的5行返回给用户。

解析和分析

在query执行之前,需要先解析和分析。关于SQL的语法相关的规则会在第八章和第九章介绍。presto检验语法规则,然后分析这个查询。

  • 识别表名称:表在catalog和schema内,所以多个表可以又相同的名称。
  • 识别列名:一个带限定的列名 orders.totalprice 唯一只想了orders表的totalprice列。但是SQL可以通过只引用列名totalprice,也可以确认是属于哪个表的。Presto通过分析就可以找到列名是属于哪个表。
  • 识别row列内的字段。字段c.bonus 可以表示c表的bonus列,也可以表示c这一列(row类型)内的bonus字段。Presto的分析器决定应该是哪种情况,一旦有模糊问题,那么优先认为是表中的列名,以避免歧义。分析的时候,需要了解SQL语法的作用于和可见规则。采集到的信息会在之后的planning阶段用到,因此planer无需再次了解查询语言的作用域规则。

如上文所述,Query分析器有一个非常复杂和交叉剪枝的责任。他的角色是属于技术性的,对于终端用户而言,只要查询语法是对的,就看不到这部分解析的过程。只有查询违反了SQL语言的规则、没有权限等错误情况下,用户才能意思到Query解析器的存在。

一旦query分析完成,所有的符号都解析出来,Presto会进入下一步,就是planning(生成逻辑执行计划)

初步的查询执行计划

查询执行计划可以看成是一个程序,这个程序可以产出查询结果。SQL是一种声明式语言,用户写一个SQL来指定要查询的数据。和命令式程序不同,用户不必指定怎么处理数据才能获得结果。Query的计划器和优化器可以决定处理数据的过程。

处理数据的步骤的顺序就是查询逻辑执行计划。理论上,有很多种逻辑执行计划可以产生相同的查询结果,但是不同的逻辑执行计划的性能是非常不同的。所以就需要逻辑计划生成器和优化器来决定一个最佳的逻辑执行计划。可以产生相同结果的执行计划可以称为等同执行计划。

我们来看一下样例4-1中的查询。最直接的逻辑执行计划就是直接翻译SQL的语法结构。解析出来的执行计划如样例4-2所示。逻辑执行计划是一个树结构,执行时从叶子节点开始,逐层向上处理树结构。

1
2
3
4
5
6
7
8
9
10
11
样例4-2 手工生成的,非常直接的逻辑执行计划
- Limit[5]
- Sort[orders_sum DESC]
- LateralJoin[2]
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey] - CrossJoin
- CrossJoin
- TableScan[nation] - TableScan[orders]
- TableScan[customer]
- EnforceSingleRow[region_name := r.name]
- Filter[r.regionkey = n.regionkey] - TableScan[region]

逻辑执行计划中的每个元素都可以直接的,用命令式的风格来实现。例如TableScan访问底层的表,返回表中的所有的行。过滤器接收行,应用一个过滤条件,只保留满足条件的行。Cross join操作两个数据集,生成两个数据集的组合。或许可以把某个数据集保存在内存中,这样不必多次访问底层的存储系统。

接下来我们看一下这个查询逻辑执行计划的计算复杂度。如果不知道所有实现的细节,就没办法评估复杂度。但是,我们可以假定,最小的复杂度就是数据集的大小。因此我们用O表示法。如果N,O,C,R代表nation,orders,customer,region的表的行数。这样我们可以观察到下边的内容:

  1. TableScan[orders]读取orders表,返回O行,所以复杂度是Ω(O)。类似的其他的两个tableScan的复杂度是 Ω(N),Ω(C)。
  2. TableScan[nation]和TableScan[orders]上的CrossJoin综合了nation和orders的数据,所以复杂度是 Ω(N × O)。
  3. 再上层的CrosssJoin,两个数据集的大小分别是N*O和C,所以复杂度是Ω(N × O × C)。
  4. 底层的TableScan[region]复杂度是Ω(R)。但是,由于后续的Join,这个tableScan被调用了N次,N是从聚合计算中返回的行数。所以这个运算复杂度是 Ω(R × N) 。
  5. Sort运算符需要对N行进行排序,所以他的时间复杂度最小是N*log(N)

上述运算符是代价最大的,所以在这里先忽略其他的运算符。这里的总的代价最小是Ω[N + O + C + (N × O) + (N × O × C) + (R × N) + (N × log(N))],无需直到表的相对大小,这个公式可以简化成Ω[(N × O × C) + (R × N) + (N × log(N))].假设region是最小的表,nation是第二小的表,我们可以忽略第二部分和第三部分,获得一个简化的结果Ω(N × O × C)。

关系代数的公式到此为止,接下来我们看看这在实际应用中的意义。举个例子,假如一个热门的购物网站有1000万客户,分布在200个国家,总共下单了10亿次订单。两个表的Cross Join需要物化20,000,000,000,000,000,000行数据。对于一个中等的100个节点的集群,每个节点处理速度是100万行/秒,则需要63个世纪才能处理完所有的数据。

当然,Presto不会去傻乎乎的处理这样一个图样图森破的执行计划。但是这样一个图样图森破的执行计划有他的用处。初始化的执行计划扮演了一个桥梁的作用,连接起SQL语法及其所代表的语义规则,和查询优化。查询优化的作用就是把这样一个原生的执行计划转化成一个效果对等的计划,但是执行上尽可能的快,至少能用有限的Presto资源在可接受的时间延时内完成。 接下来我们探讨一下查询优化器是如何获得这样一个优化目标的。

逻辑执行计划优化的规则

在这一章节,我们来看一下Presto用到的优化规则。

1.谓词下推

谓词下推可能是唯一最重要的优化规则和最容易理解的规则。他的目标就是把过滤条件下推到离数据源越近越好。因此,数据的规模会在更早的执行阶段完成。在我们的案例中,过滤会转换成一个简单过滤和内连接,之下是同样的CrossJoin条件。变化后的执行计划如下所示4-3。没有发生变化的部分在这里忽略调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
样例4-3,CrossJoin和Filter变成一个InnerJoin
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey] // original filter
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // 变成简单的filter
- InnerJoin[o.custkey = c.custkey] // 添加一个内连接InnerJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]

原来更大的Join变成了一个InnerJoin,保持相同的条件。我们暂时不深入细节,假设这样一个Join可以在分布式系统中高效的执行,计算复杂度等同于处理的行数。这意味着谓词下推至少把Ω(N × O × C) CrossJoin 实现变成了一个Θ(N × O)。

但是,谓词下推不能优化nation和orders表之间的CrossJoin,因为两个表之间没有直接的条件关联。这正是消除CrossJoin方法能发挥作用的地方。

2. 消除CrossJoin

在没有CBO(基于代价的优化器),Presto 根据表出现在select中的顺序来进行join。一个重要的例外是,要join的表没有任何关联条件,这就是Cross Joinn。在实际案例中,crossjoin并不符合需求,Join出来的大部分行在之后都会过滤掉。但是cross join有太多的工作以至于可能完不成。

消除Cross join把要join的表的顺序进行重排,目的是减少cross join的数量,最好是减少到0。在没有表的相对大小信息情况下,处理Cross join消除, table重排也可以做,所以用户可以掌控这一切。所以消除Cross Join的案例参考样例4-4. 现在两个Join都是inner join,促使Query的整体代价变成Θ(C + O) = Θ(O).其他部分没有发生变化,所以整体的查询计算代价是Ω[O + (R × N) + (N × log(N))]。当然O所代表orders表的行数是决定性因素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Example 4-4. 重排join消除cross join
...[i]
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] 按照nationKey过滤
- InnerJoin[o.custkey = c.custkey] 按照cutKey inner join
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
...
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- TableScan[orders]

3. TopN

通常, 如果一个query包含了limit语句,那么前边必然跟着一个order by语句。否则,如果没有order by排序,那么SQL返回的结果是随机的,你多次查询,结果会不同。带上order by ,则会保证查询的顺序和结果。

当执行一个查询是,Presto可以对所有的行进行排序,然后取最前边的几行。这种做法的复杂度是Θ(row_count ×log(row_count)),内存复杂度是Θ(row_count)。 这种排序整个结果,而只取部分结果的做法,明显非常浪费,不是最佳做法。因此,一个优化规则把order by 和limit联合语句优化成了TopN逻辑计划节点。在query执行阶段,TopN在一个堆结构中保存一定数目的行数,当流式读取数据时,更新这个堆结构。这让时间复杂度降低到了Θ(row_count × log(limit)) ,内存复杂度降低到了Θ(limit)。 整体的查询复杂度是Ω[O + (R × N) + N]。

4. 局部聚合

Presto没必要把orders表所有的行都传递给join,因为我们不是对每个单独的订单感兴趣。我们的样例查询,计算的聚合是堆每个nation计算totalprice的sum值,所以可以先预聚合,如样例4-5所示。我们通过聚合数据,减少了流入下游Join节点的行数。结果不是完整的,这也是为何被成为预聚合。但是数据的规模有可能被降低,显著的提升查询性能。

1
2
3
4
5
6
7
8
- Aggregate[by nationkey...; orders_sum := sum(totalprice)] 
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- Aggregate[by custkey; totalprice := sum(totalprice)]
- TableScan[orders]

对于并行计算,预聚合可以以不同方式完成,也称为『局部聚合』。下文我们展示一个简化的执行计划,当然和实际的EPLAIN计划相比,有一些不同。

实现规则

上文提到的规则是优化规则,规则的目标是减少查询处理时间,内存用量,网络交换的数据量。但是,即使是我们的样例查询,初始的逻辑执行计划还包含一些尚未实现的操作: lateral join。 在下一章节,我们看一下Presto怎么实现这种类型的操作。

1. lateral join解耦

lateral join可以实现成一个for-each循环,遍历一个数据集的每一行数据,然后执行另一个查询。这样的实现是有可能实现的,但这不是Presto处理的方式。其实,Presto解耦这样的子查询,上拉所有的相关条件,组成一个left join。在SQL语义中,这对应下边的查询转换。

1
2
3
4
SELECT
(SELECT name FROM region r WHERE regionkey = n.regionkey)
AS region_name, n.name AS nation_name
FROM nation n

上述查询转化成:

1
2
3
SELECT
r.name AS region_name, n.name AS nation_name
FROM nation n LEFT OUTER JOIN region r ON r.regionkey = n.regionkey

尽管我们可以这样交换规则,但是谨慎的、堆SQL语义非常了解的读者知道,上边的两个语句不是完全对等。如果region表中有regionkey重复的条目,第一个查询会失败,第二个查询不会失败。第二个查询虽然不会失败,但是会产生更多的结果。基于这个理由,lateral join解耦使用join之外的两个元素。第一,给原表的行编码,以使他们可区分。第二,join之后,检查一下是否有任何行是重复的。如样例4-6所示。如果检测到重复,查询会失败,以保护原始的查询语义。

1
2
3
4
5
6
7
8
样例4-6 lateral join解耦需要额外的检测
- TopN[5; orders_sum DESC]
- MarkDistinct & Check
- LeftJoin[n.regionkey = r.regionkey]
- AssignUniqueId
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- ...
- TableScan[region]

2. Semi-Join(In)解耦

一个子查询不仅可以用来获取信息(如上边lateral join样例),也可以用IN谓词来过滤行。实际上,IN谓词可以用来在filter中(WHERE 语句), 或者在一个projection中(SELECT 语句),当你在projection中用一个in语句的时候,他不仅仅是一个简单的bool 判断的运算符,例如exists。实际上,in谓词可以判定成true,false,null。

让我们考虑一个查询场景,找到那些顾客和物品供应商来自同一个国家的订单,如样例4-7所示。这样的订单可能存在,例如,我们想节省物流费用,减少物流环境的影响,可以直接把物品从供应商处寄给顾客,省略掉分拣中心。

1
2
3
4
5
6
7
8
9
10
SELECT DISTINCT 
FROM lineitem l
JOIN orders o ON o.orderkey = l.orderkey
JOIN customer c ON o.custkey = c.custkey
WHERE c.nationkey IN (
-- subquery invoked multiple times
SELECT s.nationkey FROM part p
JOIN partsupp ps ON p.partkey = ps.partkey
JOIN supplier s ON ps.suppkey = s.suppkey WHERE p.partkey = l.partkey
);

当然,上述需求可以通过lateral join实现,可以实现成一个循环,遍历外层查询的每一行,然后子查询获得所有的供应商的所有的国家,多次调用即可。

但是我们有更好的做法,Presto解耦子查询,去除关联条件后,子查询计算一次,然后回来和外层的查询通过关联条件进行join。棘手的地方在于join要保证不能产生多行结果(所以要用到去重的聚合)。这种转换正确的保留了in谓词的三值逻辑。

在这个案例中,去重的聚合计算使用了和join相同的分区,所以可以以流式的方式执行。不需要在网络上交换数据,同时使用了最少的内存。

本文总结

以上介绍了Presto的一些基础处理操作,在下一篇文章中,我们将介绍一些高端话题:基于代价的优化器(CBO).

Presto 任务调度: 任务分配到哪里

在调度任务时,有几个决策点,第一:分配多少个任务,第二:每个任务分配到哪些机器上。

要回答这个问题,首先把计算任务做一下分类,在presto内部有4种计算类型的节点:

  1. source节点,读源数据的节点,负责读取数据、Map阶段的计算任务。分配的个数由SplitManager根据数据决定。
  2. fixed节点,shuffle节点,用于处理reduce任务。比如group by计算,source阶段的数据按照hash发送到fixed节点。分配的个数由hash_partition_count这个session参数决定,由于是固定个数,所以对于不同的数据规模采用同样的个数也不是很合适,这也是需要改进的点。
  3. single节点,单节点计算任务,某些计算需要在单一节点计算,比如MergeReduce,output等,需要分配一个节点。
  4. coordinator only,只在coordinator节点计算的任务,一般是meta类操作。

image.png

Presto是一个计算引擎,采用计算和存储分离的架构。所以,理论上来讲,上边回答清楚每个阶段分配多少个计算任务,然后按照随机的原则把task分配到每台机器上,并注意机器之间的负载均衡就足够了。

但是这样性能并不是最佳的。我们都知道,大规模的数据迁移是一件非常耗时的事情。在计算机硬件中,CPU、内存、磁盘、网卡,这些硬件中,网卡的性能提升是最慢的,在整个分布式系统中也是性能最差的。
因此在调度上presto提供了一种基于拓扑的调度算法(Topology Aware schedule),以使source节点尽量靠近数据。

拓扑调度算法Topology Aware Scheduling strategy

image.png

Presto把集群资源划分成两级结构(Machine,集群)(包含机架的三级结构是我们的实现方式,不是presto原生的算法)。

分配算法如下:

  1. 当分配一个split时候,会给定split的一个地址,这个地址代表期望的分配地址。
  2. 从Machine层开始查找,首先查看Machine上已经分配的split个数是否已满,若未满则分配该机器,
  3. 若Machine资源池已满,然后查看机架级别的资源池。多个机器组成一个机架。然后在同一个机架找出一台机器,这台机器的资源池小于75%,则分配这台机器。
  4. 如果在机架级别找不到可用机器,则到集群级别找一台机器,这台机器的资源池使用量小于50%,则分配这台机器。否则分配失败。

presto代码提供了Machine/集群的两级拓扑结构,但是架构上提供一个基础的多级分配策略,因此我们增加了机架级别的资源调度。我们知道,同一个机架内机器在一个交换机下,网络速度要快于跨机架传输。

Presto调度上的其他优化点

presto提供了基础的调度能力,这个调度策略是很简单的,如果要实现精细化的调度,还有很多工作可以做。例如:

  1. 上文提到的Fix Count Scheduler,对所有的计算采用固定的节点个数。这个可以通过CBO,衡量不同数据源的大小,以及计算的复杂度,分配不同的节点个数。
  2. 上文提到的机架级别调度。
  3. 在资源池的管理上,简单以split的个数来衡量,但是我们知道每个split对应的数据量和计算复杂度是不一样的。因此更加精细化的策略是根据统计数据预估split消耗的内存/cpu。

Procella@Youtube 把计算加速玩到极致的实时计算引擎

Procella@Youtube 把计算加速玩到极值的实时计算引擎

Procella的应用场景

在youtue内部,在数据分析领域有4个方面的应用场景:

  • 报表和大盘:1000亿数据/天,要求在10ms的延时内完成近实时的计算,主要的计算类型过滤/聚合/set/join
  • 内嵌的统计指标,例如视频的浏览人数等等,特点是数据不停的变化,每秒上百万次查询。
  • 时序数据监控:特点是query固定,数据量小,可以下采样,旧数据过期,以及一些特有的查询例如估算函数和时序函数。
  • Ad hoc查询:提供给数据科学家,BI分析师等使用的,特点是query比较复杂,且不可预测,要求延时在秒级别处理一万亿行数据。

老的解决方案和挑战

针对以上需求,在Youtube内部原来有这几种解决方案:

image.png

采用以上方案,有以下挑战:

  • 数据需要在多个系统之间流转,复杂度高。
  • ETL打开的维护开销。
  • 不同系统之间的数据一致性和数据质量差别。
  • 不同系统都有学习成本,有些系统不支持SQL全集。
  • 一些底层组件性能查,可用性和扩展性无法满足需求。

Procella的特点

为了解决上述挑战,Youtube引入了Procella, Procella有如下特点:

  1. 支持标准SQL,并且做了一些扩展,支持估算函数、嵌套类型、UDF等。
  2. 高扩展性,计算和存储解耦,两者都可以无限扩展。
  3. 高性能,引用最新的计算计算,支持百万qps,毫秒级别响应。
  4. 数据freshness,支持batch和stream两种写入模式,原生支持lambda架构。

Procella的性能

image.png

Procella 15亿查询/天,一条扫描8E16行数据/天,返回1000亿行/天

image.png

延时的中位数是25毫秒,p999为 2.8秒,也就是说99.9%的查询在2.8秒内查询完成。

在毫秒级别晚上大规模数据的计算,Procella是如何做到的呢?下文将来揭秘。

Procella的架构

image.png

Procella采用计算和存储解耦的架构:

  • 数据存储在分布式文件系统Colossus上,类似hdfs/盘古。

  • 计算任务跑在由borg维护的机器上。

数据写入

Procella数据写入有两条链路:

  • 右侧的realtime数据,写入Ingestion Server,Ingestion Server把数据存储在Collossus上,与此同时,会把数据写一份到DataServer上(为啥呢? DataServer是负责计算的,发送到DS是为了做缓存,更有效的利用内存数据,因为Youbute的查询大部分依赖最近的数据,下文在查询加速上会提到该点)
  • 批量导入节点,采用离线的方式,离线通过MapReduce任务生成满足格式要求的文件,把文件写到Colossus上,然后再把文件信息注册到RegistrationServer上,这样避免了突然导入大量数据对系统的负载。

数据模型

Procella的数据分成两部分,分别是Data和metadata。

Procella采用的是表模型,存储格式是Artus的列式格式,数据存储在分布式文件系统Colossus。计算由DataServer完成,由于计算和存储是解耦的,所以同一份数据可以由多个DataServer来做计算。

MetaData包含了用于辅助计算的一些结构,称为secondary structure。包括zonemap(min,max等信息), bitmap(倒排索引), bloom filter, 分区, 排序键等。 metadata从列存储的文件头中读取,或者ds在计算query时生成。 metadata和data存储位置不同,是存储在bigtable或spanner上的。

通过DDL的形式管理表,DDL发送到Root Server(query入口),管理表的机制有分区、排序、constraint, 写入方式等。对于实时表,还支持生命周期管理,下采样,compact。

在compaction阶段,允许通过使用user defined sql 来处理数据,例如过滤,聚合,生命周期管理等。相当于在compaction阶段来做etl。

查询

好了,到了最关键的查询阶段, 首先看一下查询的流程。

在上文的架构图中,可以看到Procella的查询流程。

  1. client发起query,到达RootServer。
  2. RootServer解析sql, 优化plan
  3. 分配计算任务到DataServer上。
  4. DataServer读取数据(大部分在内存中, 少量读远程分布式存储),计算。
  5. 最终结果通过。

在整个查询,例如语法解析,plan 优化,计算,都是常见的流程。 而Procella的亮点在于做了相当多的优化手段,以加速计算,实现毫秒级别的响应。下文介绍具体有哪些优化手段。

查询优化

DataServer端的优化

  1. 读取数据时,数据有三种来源,local mem,缓存在本地内存中的数据;remote file,存储在分布式存储系统上的文件,remote memory,通过RDMA技术读取远程DataServer内存中的内容。RDMA 技术可以比RPC/http技术更快的从其他机器内存中读取数据。
  2. 尽可能的把计算任务下推到数据节点,例如filter,agg,project,join,能下推到最底层就下推到最底层,这样可以最大限度的减少数据的读取量,以及计算量。
  3. 在计算时,以encoding-native的方式读取,什么是encoding-native呢?就是算法感知编码格式,例如dictionary编码,在filter时,先从字典中filter出来要的key id,再到数据编码中找对应的id。
  4. 数据交换用stubby(一种grpc协议), shuffle用RDMA协议。

Cache

上文的架构图中提到,Procella是一种计算和存储分离的架构,这种数据和计算分离的方式,虽然能带来规模上的扩展性,但是也给单次查询带来了overhead。Procella时怎么做的呢?Procella通过大量的使用缓存来达到加速的目的,避免从远程读取数据。

  1. file metadata cache:数据存储在分布式存储上,每次读取文件,首先要向master获取文件真实存储的机器,然后再取对应的机器上读取数据。每次调用必然带来开销,因此在内存中维护了block -> file的handle信息。这样就比不要每次都从远程读取了,节省了rpc的开销。
  2. header cache, 列存文件的header中记录了文件的meta,每次读取这些meta要打开文件。IO的开销很大,因此在内存中采用lru来缓存这些header。
  3. Data Cache, 部分数据缓存在内存中,以加速计算,前文提到,实时数据流会有一份发送到内存中。
    1. 在内存中的数据格式,和磁盘上是一样的。这意味着,数据以encoding的模式放在内存中,不会反序列化,在计算时也直接操作encoded数据。这样节省了反序列化的开销。
    2. 一些计算复杂的output,也会缓存下来,还有一些metadata,例如bloom filter也会缓存下来。
  4. metadata的缓存,上文提到的各种secondary structure,都会缓存在内存中。
  5. 亲缘调度,由于大量的使用了缓存,也就意味着,在调度时,跟这份数据相关的,都需要尽量调度到同一个机器上,避免调度上颠簸,提高缓存命中率。
  6. 由于缓存技术以及很高的缓存命中率,使得procella在使用上彻底变成了一个内存数据,2%的数据保存在内存中,换取了99%的file handle命中率和90%的data 命中率。当然这个命中率可能是跟youtube特有的使用模式相关的,在youtube有大量的计算最新数据的需求。
  7. 对于metadata的缓存,从存储load到内存时,会编码成encoding模式,减少内存的使用率。

特有的列式格式Artus,启发式的encoding

youtube在设计列式存储时,需求有两个,分别是

  1. lookup,随机seek某些行数据。
  2. range scan, 扫描大块的数据。

存储上采用各种encoding 模式,不用压缩数据。这意味着读取时,也不用解压缩。
同时在计算时,直接在encoding模式智商进行,避免了反序列化。
encoding之后的数据大小和压缩后的大小基本接近,也就是说在存储空间上不是问题。

procella采用启发式的编码方式,根据数据特征进行特殊编码

  1. 在编码时进行多次扫描数据,第一次扫描采集轻量级信息,例如distinct value,min,max, sort order, 根据这些信息决定最佳的encoding模式。
  2. encoding模式包括dictionary encoding, rle, delta等。
  3. 如何动态的选择编码格式呢? 根据上边采集的信息,同时也会评估每一种encoding的压缩率和速度,决定最终的编码方式。
  4. 在查询时,可以实现O(logn)级别的搜索,O(1)seek到具体行。当然对于变长类型(例如字符串), 可以给每一段数据记录skip block信息,方便查找时快速跳过某些block。
  5. file和column header中记录了多种meta,例如sorting, min,max,encoding,bloom filter等用于剪枝,减少不必要的IO。例如通过pk来分文件,减少文件的读取次数。
  6. 通过倒排索引来加速in类计算。通常in类操作,可以变成join计算,或者逐个比较每个值,这种处理方式的性能是很差的,join计算是SQL里边的老大难题了,而逐个比较,时间复杂度在O(m*n)级别。而通过倒排索引,可以直接在O(1)级别知道是否存在对应的数据。
  7. 在数据摄入时,存储过程会针对计算做一些优化,比如分配partition, 部分aggregation预计算,sort key, 热数据放到ssd上等。

计算引擎

  • 静态codegen
    • 对于大多数的分布式计算系统,都喜欢用llvm来动态生成执行代码,以减少虚函数调用和分支跳转的开销。 但是llvm是有代价的,动态生成代码会带来毫秒级别的开销,对于Procella这样的系统,要求在几十毫秒内完成计算,动态codegen的overhead是不可接受的。同时由于他们的计算系统有个特点是大部分的query是固定的,比如前端显示的视频uv,pv等等。因此他们采用了静态编译的方式,通过c++ template来静态codegen,相当于枚举了所有的可能的operator。也算是一个特色吧。
    • 这种方式缺点也很明显,静态codegen需要在编译时刻把所有代码编译出来,估计编出来binary会非常大,放到内存里边也会占用大量内存,还需要防止换出到swap。
  • 以block为单元进行计算,block的大小不超过L1 cache,充分利用CPU的高速缓存。
  • 代码上保证能够自动的向量化处理(simd)。
  • 计算以encoding-native operator来计算。保证充分利用每一种encoding的计算优势,也避免数据反序列化的开销,同时也节省了内存使用。但是估计每个operator要是适配每一个encoding,代码工程量会不少。
  • 谓词下推,减少scan的数据量,算是古老的加速手段了。
  • metadata server 利用partitioning减少读取的文件,相当于谓词下推在partition上应用一层,过滤出只需要扫描的partition。
  • 利用metadata的计算。
    • 利用bloom filter & max/min信息 & 倒排索引以及其他的mta减少读盘计算。比如知道一个block的min/max之后,就可以根据filter条件确定是否要读取这个block。

Join

Join算是SQL里边的老大难题了,有很多算法来优化join的性能,比如RBO,CBO
Processla支持的join类型有

  • broadcastjoin,广播式join,在大表join小表的场景,把小表复制到大表所在的每一个机器上,这样避免大表的复制。
  • Co-partitioned join,左右表具有相同的partition信息,那么只把partition相同的数据hash到相同的机器,避免全量数据计算hash和跨机器拷贝。
  • shuffle join, 左右表分别计算hash,hash值相同的数据shuffle到同一个机器上进行join,算是最慢的
  • Pipelined join,如果右表是一个复杂的子查询,但是结果集合很小,那么倾向于先算出右表的结果,然后再按照broad cast join的方式。
  • Remote Lookup ,如果dim table比较大,这个时候就不适合hash或者broadcast了,由于procella的存储格式支持高效的lookup,O(1)或O(logn)级别,因此可以通过rpc发起远程的lookup来完成join。

长尾query

在SQL/MapReduce任务中,最常见的一个难题是长尾query,由于局部计算过热导致影响整体的查询延时。

Procella提出了这些优化:

  • Backup 策略
    • 对于每一个data server上的计算都会统计其响应延时,并且汇总成分位,对于远大于均值的请求,认为是利群点,这个时候,启动备份的data server来重新计算这个任务。
  • 限制发送到同一个ds的请求速度,前文讲过,processla是计算存储解耦的架构,但是为了充分利用缓存,又倾向于把同一份数据的计算调度到同一个机器上,因此这台机器又会成为热点。因此需要对发送到同一个ds的请求做资源限制。
  • Smaller query具有高的优先级。 smaller query可以快速跑完,让出资源,因此procella倾向于让小query先跑。这和presto是相反的逻辑。presto倾向于让大query先跑,否则大query就饿死了。

Intermediate Merging

在merge节点,有些merge操作是单点的,因此会成为计算的瓶颈点。Procella采集一些统计信息,如果发现merge节点一段时间还未完成计算,那么就开启一些前置的线程进行分布式计算。预聚合的结果发送给merge算子。前置线程是在同一台机器上的,也避免了数据交换。

Query 优化器

静态优化器

静态优化器是指在plan阶段执行的优化逻辑,包括RBO,CBO。procella支持了RBO, filter pushdown, subquery decorrelation, constant folding等。

启发式优化器

Procella的一个特点就在于这个启发式的优化器,相对静态优化器而言,启发式优化器是在运行时执行的。启发式优化器通过采集计算的状态统计,例如统计数据,基数,distinct count,分位信息等。启发式优化器甚至可以在执行时修改物理执行计划。我们知道通常而言,SQL在plan阶段生成逻辑执行计划,经过优化后,再生成物理执行计划,物理执行计划下发到机器上后,就不再修改了。Procella做到了动态的修改plan。

主要又这几个优化点:

  1. 决定reduce的个数,例如,计算group by时,下游分配多少个节点是跟group by的key的基数相关的,因此可以统计基数信息,动态决定分配多少个reduce节点,甚至可以运行时取消部分reduce任务。
  2. 启发式agg:正式运行之前,在一份小的数据聚合上运行一下,获得结果和统计数据,评估输出的行数,决定分配多少个reduce节点。
  3. 启发是join:也是根据采样的数据评估哪种join方式比较合适。
  4. 启发式sort:sort避免不了全局排序,在全局做排序是非常消耗内存和CPU的事情,procella采集数据的分布信息,把全量数据分配到n个桶里边,Map阶段按照分位shuffle到下游节点,实现了桶排序。

当然启发式优化器是有代价的,需要先在采样的数据上跑一次,才能决定正式的执行计划。因此会带来一些overhead。procella提供了开关,可以关闭启发式优化器。不过对于用户而言,是非常乐意通过少量的overhead来换取很大的性能提升的。

Youtube内嵌statistics

内嵌指标就是我们通常在视频网站上看到的一些指标,比如视频多少人观看,多少人点赞等等。

针对这部分计算的需求有:

  1. 数据更新快。
  2. 请求的qps高。
  3. 查询要求毫秒级别。

为了应对上述需求,procella做了以下优化:

  1. 新数据写入存储的同时,也会写入ds内存中,以保证查询时命中内存。
  2. MDS作为ds的模块,以减少rpc调用。因为获取一些meta信息会向mds请求。而rpc的开销至少也是毫秒级别。
  3. metadata 预加载 & 异步增量同步,保证meta数据都在内存中。
  4. plan缓存在内存中。通常词法/语法解析,以及各种plan优化也会带来开销。针对内嵌场景,大量的query都是重复的,因此缓存下来可以节省不少CPU和延时。
  5. RPC合并发送。不同的server之间交互的时候,把rpc合并起来发送,以减少网络开销。
  6. 监控RS,DS的错误率,一旦达到阈值,把机器下线。
  7. 禁用启发式优化器,前边提到启发式优化器会带来一些开销,针对内嵌指标场景其实是没必要的。

总结

总结下来,procella做了很多方面的优化,以保证计算在毫秒级别完成,总体而言,这些优化手段可以归结为:

  1. 通过metadata减少读取磁盘的次数和大小。
  2. 通过encoding-native的方式让计算靠近数据,减少反序列化开销,也让encoding加速计算。
  3. 大量内存缓存的使用,使得procella完全变成了一个内存计算引擎。
  4. 启发式的优化器,动态优化执行计划。

参考资料

procella 论文

Presto Coordinator分布式改造

背景

自从上线SQL功能以来,经过两年的时间,随着sls业务的不断增长,每天处理1亿次query,扫描1000,000,000,000,000行日志(没错1000万亿行日志)。业务的增长也给系统带来无形的压力,如何保障在低延时的前提下提供这么大的负载是一个巨大的挑战。

挑战之一就是单master架构。在之前的文章中,我介绍过了presto的架构。coordinator是presto中负责query解析,任务调度,结果汇总的,集群监控的节点。其他的worker节点只负责接收单个task进行计算即可。coordinator的任务可谓是多而广,在整个集群中起到了不可替代的作用。一旦该节点出问题,整个集群不可用。

上图为集群平均cpu,下图第一个为coordinator的cpu,可见coordinator的负载一直处于高位,且明显高于普通的worker。
image.png

随着业务的上升,coordinator的几点瓶颈愈发凸显:

  1. 负责query的解析和任务分发,负载比较重,且随着业务增加,负载线性增加。而单机的性能有上限,无法横向扩展。
  2. 单coordinator节点无容灾,一旦宕机,恢复时间比较长。

为了解决以上问题,我下决心把coordinator改造成分布式。

coordinator主要工作内容

coordinator主要有几个功能:

  1. query的解析,任务调度,任务监控。
  2. 集群内存监控。
  3. 机器状态监控。

query任务流

image.png

query从提交到coordinator,到query执行完成,由coordinator负责管理,一个qeuery的具体流程包括:

  1. 队列排队,获取运行许可。
  2. 词法解析和语法解析,把SQL转换成抽象语法树。
  3. 语法优化,语法检查。利用预定义规则对语法数进行优化,例如剪枝优化。还有把单机plan改造成并行式plan。
  4. 对plan划分出不同的stage,并且按照并行度的需求,为每个stage生成多个task,每个task指定一台机器。形成最终的逻辑执行计划。
  5. stage代表的任务的上下游,task代表一个stage内的并行执行。一个stage内的task执行相同的任务,只是接收的数据不一样。
  6. 每个task调度到一个worker上,并且coordinator为每个task启动一个任务追踪,即TaskInfoFetcher,周期性轮训task的状态。
  7. 当轮训到task的状态发生改变时,回调改变stage-> query的状态,最终直到query的所有task都结束。

整个任务流中,像query解析,调度,监控等都可以在任意节点执行,不需要非得在全局唯一节点执行。 只有queue涉及到全局队列,需要在单点执行。

集群控制流

Presto集群控制只要有两方面内容:

  1. 集群状态控制
    1. 检查每台机器的状态,判断是ALIVE、SHUTDOWN中的哪个状态。
    2. 检查每台机器的各个内存池分配状态,终止超限query,以及提升大query的优先级。
  2. Query状态控制
    1. 轮询每个task的状态,通过状态机管理query状态。

image.png

集群内存管理

Presto采用逻辑的内存池,来管理不同类型的内存需求。

Presto把整个内存划分成三个内存池,分别是System Pool ,Reserved Pool, General Pool。

  • System Pool 是用来保留给系统使用的,默认为40%的内存空间留给系统使用。
  • Reserved Pool和General Pool 是用来分配query运行时内存的。
  • 其中大部分的query使用general Pool。 而最大的一个query,使用Reserved Pool, 所以Reserved Pool的空间等同于一个query在一个机器上运行使用的最大空间大小,默认是10%的空间。
  • General则享有除了System Pool和General Pool之外的其他内存空间。

Presto内存管理,分两部分:

  • query内存管理
    • query划分成很多task, 每个task会有一个线程循环获取task的状态,包括task所用内存。汇总成query所用内存。
    • 如果query的汇总内存超过一定大小,则强制终止该query。
  • 机器内存管理
    • coordinator有一个线程,定时的轮训每台机器,查看当前的机器内存状态。
    • 当query内存和机器内存汇总之后,coordinator会挑选出一个内存使用最大的query,分配给Reserved Pool。

内存管理是由coordinator来管理的, coordinator每秒钟做一次判断,指定某个query在所有的机器上都能使用reserved 内存。那么问题来了,如果某台机器上,,没有运行该query,那岂不是该机器预留的内存浪费了?为什么不在单台机器上挑出来一个最大的task执行。原因还是死锁,假如query,在其他机器上享有reserved内存,很快执行结束。但是在某一台机器上不是最大的task,一直得不到运行,导致该query无法结束。

Coordinator分布式改造

从上边的介绍中,我们可以看出,只能在集群唯一节点进行的操作有:

  1. 全局队列。
  2. 机器状态监控。
  3. 集群内存状态管理。

二coordinator上负载最重的操作是query的task状态管理。一个query会分配若干个task,这个膨胀是很大的。每个task都要轮询状态信息。这部分逻辑是可以分布式搞得,不必放在全局唯一节点进行。

image.png

coordinator改造方案,就是把一些不是全局性的操作,都拆分到不同节点去做。而把全局性的操作放到一个节点。所以原来的coordinator会变成两个角色,watch tower和新coordinator:

image.png

Watch tower:

只负责集群的监控,不负责跟query相关的工作。

  • 集群内存管理,周期性轮询每台机器的内存分配状态,把最大的query提升导reserved memory pool。
  • 机器状态管理,周期性轮询每台机器状态,是否ALIVE、SHUTDOWN。

由于coordinator需要知道每台机器的状态、内存,因此在coordinate向每台机器同步信息时,会同时把相关状态同步给coordinator。

image.png

Coordinator:

coordinator只专心做query相关的工作,接受请求,解析、调度、监控所有的task。

队列处理

presto的队列模型,可以设置query运行的最大并发度和最大排队数。这些参数可以时用户级别,也可以是整个集群的级别。

在官方的presto中,由于只有一个coordinator节点,可以很容易在单点控制整个集群的执行并发度。分拆成多个coordinator并发执行之后,控制整个集群的并发度变得比较困难。因而我们选择在单个coordinator上控制运行在本机的并发度。

客户端

image.png

改造成并发coordinator后,client端从Discovery获取到所有的coordinator(coordiantor向discovery回报心跳时,增加了标识coordinator)。并且把coordinator按照地址排序,保证所有的client获取到的都是同样的顺序。

当接收到SQL请求后,client按照user的hash值,选择一个coordinator发送。这样就可以保证同一个用户的SQL只发送导一个coordinator,从而控制并发度。当然如果coordinator发生扩容缩容,在短时间内,coordinate的顺序会发生改变,影响队列。

总结

在完成分布式改造后, coordinator可以做到水平扩展,解决了coordinator的单点问题。改造后的效果:

image.png

CPU消耗比较高的Coordinator经过分布式改造后,有3个coordinator,可以看出每个coordinator的CPU仍然很高。这是由于coordinator上负责整个query的运行监控,如果一个query分配的task非常多,那么是需要启动很多后台任务轮询task状态,这对coordinator的负载压力是很大的。虽然我们可以做到水平扩展coordinator,但是由于单个coordinator的最大并发有限,也影响了整个集群的性能。接下来我们也会继续对这部分逻辑进行改造,期望能够优化coordinator的性能。

Presto内存泄露问题调查

Presto内存泄露问题调查

问题背景:

sls的线上流量越来越大,S1几乎增长了100%。在杭州region,每隔一段时间,一部分机器Presto就会开始频繁的Full GC,重启后稳定一段时间,然后过一段时间又开始频繁Full GC。Full GC达到一定次数的时候,就发生OOM,进程直接crash。由于Full GC时间长,影响线上的可用性,因此开始投入精力进行调查。

查看GC 文件

当频繁发生GC时,会在gc文件中打出下边的内容,表示GC发生的类型(Full GC(Allocation Failure)), 在发生full gc之前,堆用了19.9G;GC后,还是19.8G,也就是说GC发生后,只释放了0.1G的空间。

1
2
3
4
2019-08-28T15:58:46.140+0800: 2414974.134: [Full GC (Allocation Failure)  19G->19G(20G), 25.4147570 secs]   
[Eden: 0.0B(1024.0M)->0.0B(1024.0M) Survivors: 0.0B->0.0B Heap: 19.9G(20.0G)->19.8G(20.0G)], [Metaspace: 13
9463K->139463K(1267712K)]
[Times: user=40.66 sys=0.00, real=25.42 secs]

由于没有足够的可用内存,于是Presto很快再次Full GC,知道最终一点内存都没有了,发生了OOM 异常,程序crash。

注:GC文件怎么打:

1
2
3
4
5
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=5M
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps

监控内存的变化趋势

从外部看进程的内存使用率,如下图所示,是看不出任何信息的。因为java进程内部维护了内存的分配。
image.png

于是我通过jmx监控去查看Presto进程内部的内存分布。 java进程把内存分成几个部分:年轻区,幸存区,老年区。不同的内存依据生命周期的长短放在不同的区域内。通常,内存是在年轻代分配,当年龄达到一定大小时,会移动老年区(这里简化了模型,实际还有幸存区的作用)。

在jmx中,存在如下的监控项目,分别表示老年代(Old Gen)和年轻代(Eden Space)已经使用的内存。通过jmx接口周期性的读取这两个属性,接入到神农监控。

1
2
java.lang:type=MemoryPool,name=G1 Old Gen/Usage/used (Long) = 1182129304
java.lang:type=MemoryPool,name=G1 Eden Space/Usage/used (Long) = 251658240

我们把时间周期拉长了看,可以看到老年代(红色线)的内存趋势一致处于缓慢上涨状态。

image.png

通过智能算法查找原因

由于内存趋势的规律性,首先怀疑的是由于某些用户的访问上涨,因此通过相似性算法,查找哪个用户的访问趋势和这个内存曲线相似。很遗憾的是,没有找到类似的用户趋势。

接下来日志聚类查看有哪些日志pattern,日志聚类能够把相似日志聚合成一个pattern,帮助我们快速浏览大量的日志。 我们注意到了下图中的这个pattern:这个pattern虽然不能说明和内存问题直接相关,但是这个pattern给我们留下了印象,接下来的一系列调查也证明内存问题和这个pattern相关。

image.png

通过heapdump查看内存泄露的原因

智能算法没有找到特定的用户,没有办法,只能老老实实的去分析内存。

java 自带的工具jmap,可以打印出当前内存的object分布,也可以把内存写到磁盘上。

1
2
打印内存直方图,:live表示在打印前执行一次full gc
jmap -histo:live `pid of java` > /tmp/jmap00
1
2
dump堆文件
jmap -dump:format=b,file=heap.dump `pid of java`

直方图能够看到内存的分配,但是信息太粗,只能到object级别,看不出具体的对象,也看不到reference。

dump heap会让程序停顿,为了避免对线上访问造成影响,先把这台机器的心跳摘掉,移除流量。再执行上述的dump命令。 获取到dump文件后,可以用jhat打开,也可以用jvisualVM 打开。不过用jhat打开时,遇到OOM的问题。最终用jvisualVM打开后,看到如下的大量对象PendingRead。再查看reference,可以确定是SqlTask 下的内存泄露了。

从reference中可以看到一个taskId,一个偶然的机会,在presto的http-request.log文件中,找到了线索,

image.png

通过sls查看presto的http请求。最近15分钟,一些task持续运行了大约15分钟。从task命名上可以看到,22号的task仍然在执行。

通过task的api,获取task当前的状态:可以发现task仍然处于running状态。

1
2
curl 11.194.214.145:10008/v1/task/20190816_183242_49436_pwzkn.2.5
{"taskStatus":{"taskId":"20190816_183242_49436_pwzkn.2.5","taskInstanceId":"1ae02d83-f024-4d45-997b-96dea741b04e","version":1768082,"state":"RUNNING","self":"http://h51c07359.cloud.et91:10008/v1/task/20190816_183242_49436_pwzkn.2.5","failures":[],"queuedPartitionedDrivers":0,"runningPartitionedDrivers":0,"memoryReservation":"0B"},"lastHeartbeat":"2019-08-27T01:34:56.905Z","outputBuffers":{"type":"UNINITIALIZED","state":"OPEN","canAddBuffers":true,"canAddPages":true,"totalBufferedBytes":0,"totalBufferedPages":0,"totalRowsSent":0,"totalPagesSent":0,"buffers":[]},"noMoreSplits":[],"stats":{"createTime":"2019-08-24T12:42:07.748Z","elapsedTime":"0.00ms","queuedTime":"0.00ms","totalDrivers":0,"queuedDrivers":0,"queuedPartitionedDrivers":0,"runningDrivers":0,"runningPartitionedDrivers":0,"completedDrivers":0,"cumulativeMemory":0.0,"memoryReservation":"0B","systemMemoryReservation":"0B","totalScheduledTime":"0.00ms","totalCpuTime":"0.00ms","totalUserTime":"0.00ms","totalBlockedTime":"0.00ms","fullyBlocked":false,"blockedReasons":[],"rawInputDataSize":"0B","rawInputPositions":0,"processedInputDataSize":"0B","processedInputPositions":0,"outputDataSize":"0B","outputPositions":0,"pipelines":[]},"needsPlan":true,"complete":false}

从taskid获取queryid : 20190816_183242_49436_pwzkn,从运行日志中获取该query的执行结果:发现query在16号的时候发生了运行时错误。

image.png

既然整个query已经确认fail了,但是task处于running状态,那么我可以强制通过task的delete api,把任务给清理掉。在清理过后,task的状态变成了ABORT状态,表示任务失败了。但是调用过后, task的请求并未终止,仍然在继续执行,过了大约15分钟,再次去查看task的状态,又变成了RUNNING状态。

内存泄露的原因

已经从上述调查中,知道了内存泄露的位置。通过阅读代码和推演,大致理清楚了内存泄露的原因。

query执行的流程如下:在正常情况下,coordinator调度split到每个机器上,生成对应的task, 然后下游的task生成轮训任务,向上游task读取计算结果。如果任何一个task发生了错误,那么coordinator会把所有的task终止掉。

image.png

但是在某些情况下,某一台机器发生了调度延迟,Task 2首先调度,并且开始了计算,但是由于遇到了计算错误,于是终止了task。 接下来这个时候task1才开始调度,然后生成了向task2轮训的任务。由于task2是异常终止的,内存中的标志位都是没有清空,导致认为task2还在读数据,因此轮训任务一直终止不了。每次轮训,都生成一个PendingRead放到内存中。日积月累,就造成了内存泄露。

验证内存泄露的原因

计算task一直不能终止,那么如果我强行通过API DELETE掉task,内存理论上可以被删除掉。

通过curl删除task的http请求,

1
curl 11.223.196.92:10008/v1/task/20190822_163910_94499_pwzkn.2.4 -X DELETE

删除后的状态

1
2
3
4
5
6
7
8
9
{"taskStatus":{"taskId":"20190822_163910_94499_pwzkn.2.4","taskInstanceId":"71f71c85-5073-4cf9-852b-16bf9c49496f","version":3239316,"state":"ABORTED","self":"
http://g24h09288.cloud.et91:10008/v1/task/20190822_163910_94499_pwzkn.2.4","failures":[],"queuedPartitionedDrivers":0,"runningPartitionedDrivers":0,"memoryRes
ervation":"0B"},"lastHeartbeat":"2019-08-27T01:32:46.223Z","outputBuffers":{"type":"UNINITIALIZED","state":"OPEN","canAddBuffers":true,"canAddPages":true,"tot
alBufferedBytes":0,"totalBufferedPages":0,"totalRowsSent":0,"totalPagesSent":0,"buffers":[]},"noMoreSplits":[],"stats":{"createTime":"2019-08-23T02:17:55.766Z
","endTime":"2019-08-27T01:32:47.296Z","elapsedTime":"0.00ms","queuedTime":"0.00ms","totalDrivers":0,"queuedDrivers":0,"queuedPartitionedDrivers":0,"runningDr
ivers":0,"runningPartitionedDrivers":0,"completedDrivers":0,"cumulativeMemory":0.0,"memoryReservation":"0B","systemMemoryReservation":"0B","totalScheduledTime
":"0.00ms","totalCpuTime":"0.00ms","totalUserTime":"0.00ms","totalBlockedTime":"0.00ms","fullyBlocked":false,"blockedReasons":[],"rawInputDataSize":"0B","rawI
nputPositions":0,"processedInputDataSize":"0B","processedInputPositions":0,"outputDataSize":"0B","outputPositions":0,"pipelines":[]},"needsPlan":true,"complet
e":true}

删除后等待15分钟,task会从内存中删除:可以看到内存发生了大幅下跌。

image.png

过几天后再去看内存,又在持续的增长,这是因为我们只是清理了内存,但是轮训任务并没有被清理掉.

image.png

构造case复现

从采样到的几个内存泄漏点,我们可以看到明显的特征,就是query遇到错误的数据,执行失败,然后才有概率遇到调度异常。因此我们可以构造一些非法数据,让source节点快速的fail掉。通过检查http-request.log中task的运行时长,可以确认是否发生了内存泄露。

修复内存泄露

从上述验证过程也可以看出,要想修复内存泄露,必须让泄露的轮训任务终止掉。有几种修复方案:

  1. 上游判断,如果stat是terminal(结束或失败)状态,返回空结果和结束标志。
  2. 下游判断,如果query是结束状态,那么不生成轮训task。
  3. 下游判断,如果task处于close状态,那么生成轮训task,但是进入清理阶段,清理掉上游的内存后退出。

综合考虑每种方案,以及测试结果,最终采用了第三种方案,可以把内存清理干净,避免一些遗留问题。

参考资料

java垃圾回收完全手册

Presto coordinator的CPU持续上涨,原因竟然是这样

问题背景

之前介绍过presto的架构, coordinator是Presto架构中负责调度的master节点。在实际部署中,为了减少该节点的负载,指定node-scheduler.include-coordinator=false,避免把计算任务调度到coordinator节点上。

由于Presto进程是常驻进程,而且需要实时的提供在线服务。通常只有在需要升级时,才会通过热升级手段重启进程。于是我们在一个大集群上发现了这个现象,coordinator的CPU随着时间不短增长。最高达到了3000%+ 。见下图是连续一个月的持续上涨。
image.png

如果任由CPU这样上涨下去,整个集群将不可控,也会影响计算能力。于是我开始了一系列的调查。

是json的原因吗?

首先,通过jstack命令打印Presto进程的栈。在栈中最常见的调用栈,是json的反序列化操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.parseMediumName2(UTF8StreamJsonParser.java:1836)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.parseMediumName(UTF8StreamJsonParser.java:1793)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1728)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:776)
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:389)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1194)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:314)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2950)
at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:198)
at io.airlift.http.client.FullJsonResponseHandler$JsonResponse.<init>(FullJsonResponseHandler.java:118)
at io.airlift.http.client.FullJsonResponseHandler.handle(FullJsonResponseHandler.java:68)
at io.airlift.http.client.FullJsonResponseHandler.handle(FullJsonResponseHandler.java:37)
at io.airlift.http.client.jetty.JettyHttpClient$JettyResponseFuture.processResponse(JettyHttpClient.java:857)
at io.airlift.http.client.jetty.JettyHttpClient$JettyResponseFuture.completed(JettyHttpClient.java:834)
at io.airlift.http.client.jetty.JettyHttpClient$BufferingResponseListener.onComplete(JettyHttpClient.java:1119)

这个栈是一个http请求的callback,在callback中把response把json反序列化成对象。http的callback线程时包括了200个线程,而我经常能在stack中看到超过180个线程在做json反序列化。从栈总看不出是属于什么http请求。但是我们能从代码中找到coordinator发出的这几类请求。

image.png

  1. task任务状态的fetcher,循环的获取task的状态,直到task结束。这其中其中有两类fetch操作:
    1. 一个获取taskStatus,只包含task状态,循环获取,前一次获取结束后,立马启动下一次
    2. 一个获取taskInfo。包含task的详细信息,每3秒获取一次。
  2. Memory管理,coordinator定期向每台机器获取机器的内存使用状况。
  3. fail detect 请求有两类:
    1. 一类是head请求,没有response
    2. 另一类是/v1/info/state请求,是worker的状态,返回类似于”ACTIVE”或者”SHUTDOWN”之类的状态。

由于内存同步和心跳检查是跟机器数据相关,而task同步则跟query数目相关。于是我怀疑是同时运行的query太多,有1000多个query在同时运行,每个query生成上百个task。 coordinator每分钟要发起几十万次读取task状态的请求。jackson库的反序列化效率不高,导致coordinator的CPU很高。 这可以解释CPU高的原因,但不能解释CPU持续上涨的原因。不管怎么样,我还是决定把jackson改成性能更佳的fastjson。

改成fastjson上线后,过段时间发现,CPU仍然在缓慢的持续上涨,不得不再重新寻找新的证据。

是不停的分配新的线程吗?

我们在jstack栈中,能清楚地看到很多线程同时在做json反序列化操作。证明是和json反序列化相关的操作。于是再去阅读源码。发现task status fetcher使用的线程池是一个可变的线程池newCachedThreadPool

具体操作是:任何请求,都是放入队列中,然后由线程池处理。如果队列满,则线程池会新分配一个线程。

于是我怀疑是负载太高,导致不停的分配新的线程,于是CPU越来越高。怎么解决这个问题?我改造了coordinator,由原来的单节点,变成了多节点,以均分coordinator的压力。

把coordinator改造成分布式能解决问题吗?

Presto原生的coordinator由于依赖单节点进行内存监控,如果强行部署成多coordinator的话,会造成内存管理的混乱,有可能让某些大query死锁。因此改造成分布式还颇费一番功夫。我会另起一个话题,讲述如何把coordinator改造成分布式。

简单的说,改造完成后,有3个coordinator。原来的coordinator,负责内存管理和worker failover管理,同时处理1/3的query。而另外两个coordinator,只分别负责管理另外1/3的query。同时把内存管理的心跳间隔调整到了5s。

过了一段时间,发现了新的情况。中心coordinator的CPU仍然在缓慢持续上涨,但是CPU并不是一直很高,而是每隔5s飙升一次。新增的两个coordinator则没有变化。于是我从所有的配置中找跟5s相关的参数,从这里开始意思到似乎和内存管理的心跳有关。

从jstack能看到跟json反序列化有关, 内存管理的http response也需要反序列化json。虽然之前把jackson改造成了fastjson,但只是改造了task心跳部分,没有改造内存管理部分。

coordinator每5s向每个worker发送一次请求,获取worker的内存使用,这个请求量是恒定的。理论上,除非我们增加机器,负载才会增加。于是我去检查了presto的http-request.log。 我把日志采集到阿里云日志服务,通过日志分析,检查http请求的变化趋势。

1
v1/memory | select date_trunc('day',__time__) as t, count(1) as pv, sum(response_size) as res from log group by t order by t

image.png
左Y轴是PV,右Y轴是response size的一天累加值。我们可以看到PV基本上没有大的变化。而response size则不断增长。20天内从最低7.6G/day增长到了237G/day。平均每个response达到149k。 这个截图是我写文章时截取的。在当时调查问题时,看到了600k的response。

response增加 -> json反序列化负载增加 -> CPU不断增加。看起来符合逻辑,接下来只有去找为什么response不断增加了。

tcp抓包

为了查看response这么大的原因,我用tcpdump抓取presto的流量。

1
sudo tcpdump  port 10000 -s 0 -w /tmp/netstat 

在/v1/memory的响应结果中,会包含general , reserved, 和system 三个pool分别总的大小以及使用大小,和目前占用对应内存池的queryId。结构是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"totalNodeMemory": "12884901888B",
"pools": {
"reserved": {
"maxBytes": 3221225472,
"freeBytes": 3221225472,
"queryMemoryReservations": {

}
},
"general": {
"maxBytes": 9663676416,
"freeBytes": 9663676416,
"queryMemoryReservations": {

}
},
"system": {
"maxBytes": 8589934592,
"freeBytes": 8589934592,
"queryMemoryReservations": {
"20190509_113603_03920_25e2h",
"20190509_113608_02758_rdy5b",
...
...
}
}
}
}

其中system -> queryMemoryReservations 这个节点下的内容非常的多,甚至包含几天前的queryId。这意味着某些query占用了system内存池,结束后没有正常释放。接下来就需要查找什么地方存在内存泄露。

查找内存泄漏点

query使用的内存,都会记录在context中,因此我专门写了个程序,向coordinator轮训已经完成的query,获取这些query的描述信息。查看到底是哪个环节的内存使用有问题。

1
2
3
4
5
6
7
"tasks" : [ {          
"stats" : {
"systemMemoryReservation" : "712B",
"pipelines" : [ {
"systemMemoryReservation" : "712B",
"operatorSummaries" : [ {
"systemMemoryReservation" : "0B",

抓取到的多个query的stat信息,都具备相同的特征:

  1. 某一个task的systemMemoryReservation 不为0。
  2. task的第一层pipeline不为0。
  3. pipeline的Operator都是ExchangeOperator和LocalExchangeSinkOperator,但是operator内部的systemMeoryReservation为0。

system内存是在计算过程中,使用的系统内存,例如两个worker之间传递数据,使用的就是system mem pool

Operator信息表明,这个节点是从前一层节点接收数据,放在内存中,供下层pipeline计算。

image.png

task -> pipeline -> driver -> operator 构成了一层层结构。由于数据显示pipeline这一层发生了泄露,我只好去看pipeline分配内存的逻辑。

image.png

Driver和ExchangeClient在分配内存时,都会调用Pipeline的内存分配逻辑。由于数据显示跟Exchange有关,于是我重点检查了这ExchangeOperator分配内存的逻辑,初看之下,每一层都能做到自平衡,也就是在某一层结束的时候,调用close,自己把尚未释放的内存释放掉。

但是,我也发现了在多线程场景下,由于执行时序不同,会导致出现内存泄露情况。

多线程时序问题

在ExchangeClient中,就存在多种场景。

第一种场景,ExchangeClient是一个生产者和消费者模型,后台线程从远端拿到数据后,由callback线程把数据放入队列,然后才分配内存。 消费者线程从队列中poll到数据后,再释放内存。

image.png

假如事件发生的顺序是这样的 P1 C1 C2 P2 , 那么会先释放内存,在C2释放内存时,由于出错而分配失败。接下来在P2再去分配内存就会出错。确保不会出错的顺序是把P1和P2对调顺序。

如果上边的假设成立,那么会在日志中看到tried to free more memory than is reserved的错误。没有从日志中发现上边的错误,于是这种假设被排除了。

第二种场景, C2在释放内存是,会先判断closed变量。如果生产者在C2之前就设置了closed变量,那么就不会进入释放的逻辑。我加了一些日志,证明发生内存泄露时,closed变量确实被设置了。

设置closed变量的方式如下代码,在代码中只设置了closed变量,而没有做内存清理:

1
2
3
if (!closed.get() && pageBuffer.peek() == NO_MORE_PAGES) {
closed.set(true);
}

由于我使用的presto代码比较老,是176版本,和master head版本进行对比,发现最新代码已经改成了close(), 调用close()函数,会提前清理内存,能够避免内存泄露的问题。
git blame的结果:

1
e3535bbfeca (Dain Sundstrom     2017-02-24 10:04:37 -0800 289)                 close();

根据commit id找到对应的信息:https://github.com/prestodb/presto/commit/e3535bbfeca

image.png

内存泄露的原因已经解决了,我用的版本已经太老了。但是由于我对presto内核做了很多改动,想要merge到HEAD版本恐怕要花费不少时间。

总结

在ExchangeClient中,向上游节点拉取数据时,有一定概率导致内存泄露,上游节点越多,概率越大。

当内存发生泄露时,内存池忠实的记录了每一个queryId。这导致随着时间推移,queryId的list越来越多。

coordinator会定时向每台机器获取内存池的使用情况。因此response越来越大,而json反序列化的CPU使用率越来越高,几乎占满了http callback的线程池。最终反映到监控图上,就是coordinator随着时间推移,CPU Usage直线上涨。

One More Thing

把ExchangeClient内存泄露的问题解决后,再去验证。由ExchangeClient造成的内存泄露已经解决了,但是发现ScanFilterAndProjectOperator在很小的场景下,会出现内存泄露。这是一个SourceOperator的派生类,会调用plugin实现的ConnectorSource,方便ConnectorSource使用系统内存。内存泄露的原因还待分析。

深入理解Presto(3):Presto内存管理

深入理解Presto(3):Presto内存管理

上一篇文章,我们讲了Presto的架构。Presto是一款内存计算型的引擎,所以对于内存管理必须做到精细,才能保证query有序、顺利的执行,部分发生饿死、死锁等情况。

内存池

Presto采用逻辑的内存池,来管理不同类型的内存需求。

Presto把整个内存划分成三个内存池,分别是System Pool ,Reserved Pool, General Pool。

image.png

  1. System Pool 是用来保留给系统使用的,默认为40%的内存空间留给系统使用。
  2. Reserved Pool和General Pool 是用来分配query运行时内存的。
  3. 其中大部分的query使用general Pool。 而最大的一个query,使用Reserved Pool, 所以Reserved Pool的空间等同于一个query在一个机器上运行使用的最大空间大小,默认是10%的空间。
  4. General则享有除了System Pool和General Pool之外的其他内存空间。

为什么要使用内存池

System Pool用于系统使用的内存,例如机器之间传递数据,在内存中会维护buffer,这部分内存挂载system名下。

那么,为什么需要保留区内存呢?并且保留区内存正好等于query在机器上使用的最大内存?

如果没有Reserved Pool, 那么当query非常多,并且把内存空间几乎快要占完的时候,某一个内存消耗比较大的query开始运行。但是这时候已经没有内存空间可供这个query运行了,这个query一直处于挂起状态,等待可用的内存。 但是其他的小内存query跑完后,又有新的小内存query加进来。由于小内存query占用内存小,很容易找到可用内存。 这种情况下,大内存query就一直挂起直到饿死。

所以为了防止出现这种饿死的情况,必须预留出来一块空间,共大内存query运行。 预留的空间大小等于query允许使用的最大内存。Presto每秒钟,挑出来一个内存占用最大的query,允许它使用reserved pool,避免一直没有可用内存供该query运行。

内存管理

image.png

Presto内存管理,分两部分:

  • query内存管理
    • query划分成很多task, 每个task会有一个线程循环获取task的状态,包括task所用内存。汇总成query所用内存。
    • 如果query的汇总内存超过一定大小,则强制终止该query。
  • 机器内存管理
    • coordinator有一个线程,定时的轮训每台机器,查看当前的机器内存状态。

当query内存和机器内存汇总之后,coordinator会挑选出一个内存使用最大的query,分配给Reserved Pool。

内存管理是由coordinator来管理的, coordinator每秒钟做一次判断,指定某个query在所有的机器上都能使用reserved 内存。那么问题来了,如果某台机器上,,没有运行该query,那岂不是该机器预留的内存浪费了?为什么不在单台机器上挑出来一个最大的task执行。原因还是死锁,假如query,在其他机器上享有reserved内存,很快执行结束。但是在某一台机器上不是最大的task,一直得不到运行,导致该query无法结束。

Presto入门: 配置第一个http connector

Presto入门: 配置第一个http connector

1. connector

在presto中,可以对接多种类型的数据源,今天以http 服务器数据为例,简单介绍如何接入presto。

2. 搭建http数据数据源

2.1 http数据源的schema

在http服务器上,提供一个文件,文件内容是数据源的格式。 一个文件是json格式,顶层是schema的名称,schema类似数据的database。schema之下是一个表的list。每张表要提供列的名称和类型,以及数据的地址,即http地址,见一个样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"schema":[{
"name":"table1",
"columns":[
{
"name":"key1",
"type":"bigint"
},
{
"name":"key2",
"type":"varchar"
}
],
"sources":[
"http://localhost:9080/data.csv"
]
}
]
}

2.2 提供数据:

http数据是一个csv格式,例如上文提到的data.csv的内容是:

1
2
10,b
1,d

2.3 配置presto

接下来配置presto,使得presto知道http 数据源的存在,创建文件etc/catalog/http.properties ,在文件中指定schema的地址:

1
2
connector.name=example-http
metadata-uri=http://localhost:9080/schema.json

2.4 查看查询效果:

2.4.1 展示http catalog中的schema

1
2
3
4
5
6
7
8
9
10
11
presto> show schemas from http;
Schema
--------------------
information_schema
schema
(2 rows)

Query 20180510_030439_00002_58j4x, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [2 rows, 34B] [15 rows/s, 263B/s]

2.4.2 展示http catalog的schema库中的表内容

1
2
3
4
5
6
7
8
9
presto> show tables  from http.schema;
Table
--------
table1
(1 row)

Query 20180510_030453_00003_58j4x, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [1 rows, 22B] [4 rows/s, 108B/s]

2.4.3 展示表的格式

1
2
3
4
5
6
7
8
9
10
11
presto> describe  http.schema.table1;
Column | Type | Extra | Comment
--------+---------+-------+---------
key1 | bigint | |
key2 | varchar | |
(2 rows)

Query 20180510_030507_00004_58j4x, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [2 rows, 123B] [9 rows/s, 603B/s]

2.4.4 获取表的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
presto> select * from http.schema.table1;

Query 20180510_031258_00005_58j4x, FAILED, 1 node
Splits: 17 total, 0 done (0.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20180510_031258_00005_58j4x failed: For input string: "a"

presto> select * from http.schema.table1;
key1 | key2
------+------
10 | b
1 | d
(2 rows)

Query 20180510_031315_00006_58j4x, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [2 rows, 0B] [41 rows/s, 0B/s]


深入理解Presto(2) :Presto查询执行过程

深入理解Presto(2) :Presto查询执行过程

Presto查询执行过程

这里写图片描述

Presto包含三类角色,coordinator,discovery,worker。coordinator负责query的解析和调度。discovery负责集群的心跳和角色管理。worker负责执行计算。

presto-cli提交的查询,实际上是一个http POST请求。查询请求发送到coordinator后,经过词法解析和语法解析,生成抽象语法树,描述查询的执行。

执行计划编译器,会根据抽象语法树,层层展开,把语法树所表示的结构,转化成由单个操作所组成的树状的执行结构,称为逻辑执行计划。

原始的逻辑执行计划,直接表示用户所期望的操作,未必是性能最优的,在经过一系列性能优化和转写,以及分布式处理后,形成最终的逻辑执行计划。这时的逻辑执行计划,已经包含了map-reduce操作,以及跨机器传输中间计算结果操作。

scheduler从数据的meta上获取数据的分布,构造split,配合逻辑执行计划,把对应的执行计划调度到对应的worker上。

在worker上,逻辑执行计划生成物理执行计划,根据逻辑执行计划,会生成执行的字节码,以及operator列表。operator交由执行驱动来完成计算。

抽象语法树

由语法解析器根据SQL,解析生成的树状结构,描述SQL的执行过程。 在下文中,以SQL select avg(response_size) as a , client_address from localfile.logs.http_request_log group by client_address order by a desc limit 10为例来描述。

抽象语法树数以Query为单位来描述查询,分层次表示不同层的子查询。每一层查询查询包含了几个关键因素:select, from,where,group by,having,order by,limit。其中,from可以是一个子查询,也可以是一张表。

一个典型的抽象语法树:

这里写图片描述

生成逻辑执行计划

抽象语法树树,描述的最原始的用户需求。抽象语法树描述的信息,执行效率上不是最优,执行操作也过于复杂。需要把抽象语法树转化成执行计划。执行计划分成两类,一类是逻辑执行计划,一类是物理执行计划。逻辑执行计划,以树状结构来描述执行,每个节点是最简单的操作。物理执行计划,根据逻辑执行计划生成字节码,交由驱动执行。

转写成逻辑执行计划的过程,包括转写和优化。把抽象语法树转写成由简单操作组成的结点树,然后把树中所有聚合计算节点转写成map-reduce形式。并且在map-reduce节点中间插入Exchange节点。然后,进行一系列优化,把一些能提前加速计算的节点下推,能合并的节点合并。

最后逻辑执行计划按照Exchange节点做划分,分成不同的段(fragament),表示不同阶段的的执行计划。在调度时,按照fragment调度。

SQL select avg(response_size) as a , client_address from localfile.logs.http_request_log group by client_address order by a desc limit 10的逻辑执行计划:
这里写图片描述

从执行计划中可以看到,agg节点都是分成partial和final两步。

调度执行计划到机器上

调度涉及到两个问题,第一,某个fragment分配由哪些机器执行;第二,某个fragment的计算结果如何输出到下游fragment。

在调度时,需要为每一个fragment指定分配到哪些机器上。从调度上划分,fragment分三种类型

  • 一类是source类型由原始数据的存储位置决定fragment调度机器,有多少个source节点呢?connector会根据数据的meta,决定需要读取多少个split(分片) ,对于每一个source节点,分配一个split到一台机器上,如果在配置中指定了network-topology=flat,则尽量选择split所在的机器。
  • 一类是FIXED类型,主要用于纯计算节点,从集群中选择一台或多台机器分配给某个fragment。一般只有最终输出节点分配一个机器,中间的计算结果都要分配多台机器。分配的机器数由配置hash_partition_count决定。选择机器的方式是随机选择。
  • 一类是SINGLE类型,只有一台机器,主要用于汇总结果,随机选择一台机器。

对于计算结果输出,根据下游节点的机器个数,也有多种方式,

  • 如果下游节点有多台机器,例如group by的中间结果,会按照group by的key计算hash,按照hash值选择一个下游机器输出。对于非group by的计算,会随机选择或者round robin。
  • 如果下游节点只有一台机器,会输出到这台机器上。

以下图为例,fragment 2是source类型fragment,有三个split,所以分配了三台机器。因为这一层计算是group by 聚合计算,所以输出时按照group by的key计算hash,选择下游的某台机器输出。

这里写图片描述

调度之前的任务,都在coordinator完成,调度完成后,之后任务发送到worker上执行。

生成物理执行计划

逻辑执行计划fragment发送到机器上后,由结点树形式转写成operator list,根据逻辑代码动态编译生成字节码。动态生成字节码,主要是利用编译原理:

  • 展开循环
  • 根据数据列的类型,直接调用对用的函数,以减少分支跳转语句。

这些手段会更好的利用CPU的流水线。

执行驱动

物理执行计划构造生成的Operator list,交给Driver执行。具体计算哪些数据,由加载的Split决定。

Operator list 以串联形式处理数据,前一个operator的结果作为下一个结果的输入,对于source类型的operator,每一次调用都会获取一份新的数据;对于Aggregate的operator,只有之前所有的operator都finish之后,才能获取输出结果。

这里写图片描述

聚合计算

生成的执行计划中,聚合计算都拆分成了两步,分别是Map、Reduce。

聚合计算的Operator有两类,分别是AggregationOperator和HashAggregationOperator。

AggregationOperator对所有行进行计算,并且把结果更新到一个位置。HashAggregationOperator使用某一列的hash值作为hash表的key,key相同的行才会把结果保存在一起,用于group by类的计算。

聚合计算都是要按照Map-Reduce的形式执行。

聚合计算所提供的函数,都要提供四个接口,分别有两个输入,两个输出:

  1. 接受原始数据的输入
  2. 接受中间结果的输入
  3. 输出中间结果
  4. 输出最终结果。

1+3 构成了Map操作 2+4构成了Reduce操作。

以Avg为例:

  1. Map阶段输入1,2,3,4
  2. Map截断输出10,4 分别代表Sum和Count
  3. Reduce输入10,4
  4. Reduce输出最终平均值2.5

我们改造了Presto系统,使得Presto能够提供缓存功能,就是在MapReduce中间加了一层计算,接受中间结果输入和中间结果输出。

函数

函数分为两类,分别是Scaler函数和Aggregate函数

Scaler函数提供数据的转换处理,不保存状态,一个输入产生一个输出。

Aggregate函数提供数据的聚合处理,利用已有状态+输入,产生新的状态。

上文已经提到,聚合函数提供了两种输入接口和两种输出接口。

深入理解Presto(1) : Presto的架构

简介

Presto是一个facebook开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。presto的架构由关系型数据库的架构演化而来。presto之所以能在各个内存计算型数据库中脱颖而出,在于以下几点:

  1. 清晰的架构,是一个能够独立运行的系统,不依赖于任何其他外部系统。例如调度,presto自身提供了对集群的监控,可以根据监控信息完成调度。
  2. 简单的数据结构,列式存储,逻辑行,大部分数据都可以轻易的转化成presto所需要的这种数据结构。
  3. 丰富的插件接口,完美对接外部存储系统,或者添加自定义的函数。

本文从外到内,依次来介绍presto。

架构

image.png

Presto采用典型的master-slave模型:

  1. coordinator(master)负责meta管理,worker管理,query的解析和调度
  2. worker则负责计算和读写。
  3. discovery server, 通常内嵌于coordinator节点中,也可以单独部署,用于节点心跳。在下文中,默认discovery和coordinator共享一台机器。

在worker的配置中,可以选择配置:

  1. discovery的ip:port。

  2. 一个http地址,内容是service inventory,包含discovery地址。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    	{
    "environment": "production",
    "services": [
    {
    "id": "ffffffff-ffff-ffff-ffff-ffffffffffff",
    "type": "discovery",
    "location": "/ffffffff-ffff-ffff-ffff-ffffffffffff",
    "pool": "general",
    "state": "RUNNING",
    "properties": {
    "http": "http://192.168.1.1:8080"
    }
    }
    ]
    }
  3. 一个本地文件地址,内容同2。

2和3的原理是基于service inventory, worker 会动态监听这个文件,如果有变化,load出最新的配置,指向最新的discovery节点。

在设计上,discovery和coordinator都是单节点。如果有多个coordinator同时存活,worker 会随机的向其中一个汇报进程和task状态,导致脑裂。调度query时有可能会发生死锁。

discovery和coordinator可用性设计。由于service inventory的使用,监控程序可以在发现discovery挂掉后,修改service inventory中的内容,指向备机的discovery。无缝的完成切换。coordiantor的配置必须要在进程启动时指定,同一个集群中无法存活多个coordinator。因此最好的办法是和discovery配置到一台机器。 secondary机器部署备用的discovery和coordinator。在平时,secondary机器是一个只包含一台机器的集群,在primary宕机时,worker的心跳瞬间切换到secondary。

数据模型

presto采取三层表结构:

  1. catalog 对应某一类数据源,例如hive的数据,或mysql的数据
  2. schema 对应mysql中的数据库
  3. table 对应mysql中的表

image.png

presto的存储单元包括:

  1. Page: 多行数据的集合,包含多个列的数据,内部仅提供逻辑行,实际以列式存储。
  2. Block:一列数据,根据不同类型的数据,通常采取不同的编码方式,了解这些编码方式,有助于自己的存储系统对接presto。

不同类型的block:

  1. array类型block,应用于固定宽度的类型,例如int,long,double。block由两部分组成

    • boolean valueIsNull[]表示每一行是否有值。
    • T values[] 每一行的具体值。
  2. 可变宽度的block,应用于string类数据,由三部分信息组成

    • Slice : 所有行的数据拼接起来的字符串。
    • int offsets[] :每一行数据的起始便宜位置。每一行的长度等于下一行的起始便宜减去当前行的起始便宜。
    • boolean valueIsNull[] 表示某一行是否有值。如果有某一行无值,那么这一行的便宜量等于上一行的偏移量。
  3. 固定宽度的string类型的block,所有行的数据拼接成一长串Slice,每一行的长度固定。

  4. 字典block:对于某些列,distinct值较少,适合使用字典保存。主要有两部分组成:

    • 字典,可以是任意一种类型的block(甚至可以嵌套一个字典block),block中的每一行按照顺序排序编号。
    • int ids[] 表示每一行数据对应的value在字典中的编号。在查找时,首先找到某一行的id,然后到字典中获取真实的值。

插件

了解了presto的数据模型,就可以给presto编写插件,来对接自己的存储系统。presto提供了一套connector接口,从自定义存储中读取元数据,以及列存储数据。先看connector的基本概念:

  1. ConnectorMetadata: 管理表的元数据,表的元数据,partition等信息。在处理请求时,需要获取元信息,以便确认读取的数据的位置。Presto会传入filter条件,以便减少读取的数据的范围。元信息可以从磁盘上读取,也可以缓存在内存中。
  2. ConnectorSplit: 一个IO Task处理的数据的集合,是调度的单元。一个split可以对应一个partition,或多个partition。
  3. SplitManager : 根据表的meta,构造split。
  4. SlsPageSource : 根据split的信息以及要读取的列信息,从磁盘上读取0个或多个page,供计算引擎计算。

插件能够帮助开发者添加这些功能:

  1. 对接自己的存储系统。
  2. 添加自定义数据类型。
  3. 添加自定义处理函数。
  4. 自定义权限控制。
  5. 自定义资源控制。
  6. 添加query事件处理逻辑。

Presto提供了一个简单的connector : local file connector ,可用于参考如何实现自己的connector。不过local file connector中使用的遍历数据的单元是cursor,即一行数据,而不是一个page。 hive 的connector中实现了三种类型,parquet connector, orc connector, rc file connector。
image.png

上文从宏观上介绍了presto的一些原理,接下来几篇文章让我们深入presto 内部,了解一些内部的设计,这对性能调优会有比较大的用处,也有助于添加自定义的operator。