Presto 任务调度: 任务分配到哪里

在调度任务时,有几个决策点,第一:分配多少个任务,第二:每个任务分配到哪些机器上。

要回答这个问题,首先把计算任务做一下分类,在presto内部有4种计算类型的节点:

  1. source节点,读源数据的节点,负责读取数据、Map阶段的计算任务。分配的个数由SplitManager根据数据决定。
  2. fixed节点,shuffle节点,用于处理reduce任务。比如group by计算,source阶段的数据按照hash发送到fixed节点。分配的个数由hash_partition_count这个session参数决定,由于是固定个数,所以对于不同的数据规模采用同样的个数也不是很合适,这也是需要改进的点。
  3. single节点,单节点计算任务,某些计算需要在单一节点计算,比如MergeReduce,output等,需要分配一个节点。
  4. coordinator only,只在coordinator节点计算的任务,一般是meta类操作。

image.png

Presto是一个计算引擎,采用计算和存储分离的架构。所以,理论上来讲,上边回答清楚每个阶段分配多少个计算任务,然后按照随机的原则把task分配到每台机器上,并注意机器之间的负载均衡就足够了。

但是这样性能并不是最佳的。我们都知道,大规模的数据迁移是一件非常耗时的事情。在计算机硬件中,CPU、内存、磁盘、网卡,这些硬件中,网卡的性能提升是最慢的,在整个分布式系统中也是性能最差的。
因此在调度上presto提供了一种基于拓扑的调度算法(Topology Aware schedule),以使source节点尽量靠近数据。

拓扑调度算法Topology Aware Scheduling strategy

image.png

Presto把集群资源划分成两级结构(Machine,集群)(包含机架的三级结构是我们的实现方式,不是presto原生的算法)。

分配算法如下:

  1. 当分配一个split时候,会给定split的一个地址,这个地址代表期望的分配地址。
  2. 从Machine层开始查找,首先查看Machine上已经分配的split个数是否已满,若未满则分配该机器,
  3. 若Machine资源池已满,然后查看机架级别的资源池。多个机器组成一个机架。然后在同一个机架找出一台机器,这台机器的资源池小于75%,则分配这台机器。
  4. 如果在机架级别找不到可用机器,则到集群级别找一台机器,这台机器的资源池使用量小于50%,则分配这台机器。否则分配失败。

presto代码提供了Machine/集群的两级拓扑结构,但是架构上提供一个基础的多级分配策略,因此我们增加了机架级别的资源调度。我们知道,同一个机架内机器在一个交换机下,网络速度要快于跨机架传输。

Presto调度上的其他优化点

presto提供了基础的调度能力,这个调度策略是很简单的,如果要实现精细化的调度,还有很多工作可以做。例如:

  1. 上文提到的Fix Count Scheduler,对所有的计算采用固定的节点个数。这个可以通过CBO,衡量不同数据源的大小,以及计算的复杂度,分配不同的节点个数。
  2. 上文提到的机架级别调度。
  3. 在资源池的管理上,简单以split的个数来衡量,但是我们知道每个split对应的数据量和计算复杂度是不一样的。因此更加精细化的策略是根据统计数据预估split消耗的内存/cpu。

Procella@Youtube 把计算加速玩到极致的实时计算引擎

Procella@Youtube 把计算加速玩到极值的实时计算引擎

Procella的应用场景

在youtue内部,在数据分析领域有4个方面的应用场景:

  • 报表和大盘:1000亿数据/天,要求在10ms的延时内完成近实时的计算,主要的计算类型过滤/聚合/set/join
  • 内嵌的统计指标,例如视频的浏览人数等等,特点是数据不停的变化,每秒上百万次查询。
  • 时序数据监控:特点是query固定,数据量小,可以下采样,旧数据过期,以及一些特有的查询例如估算函数和时序函数。
  • Ad hoc查询:提供给数据科学家,BI分析师等使用的,特点是query比较复杂,且不可预测,要求延时在秒级别处理一万亿行数据。

老的解决方案和挑战

针对以上需求,在Youtube内部原来有这几种解决方案:

image.png

采用以上方案,有以下挑战:

  • 数据需要在多个系统之间流转,复杂度高。
  • ETL打开的维护开销。
  • 不同系统之间的数据一致性和数据质量差别。
  • 不同系统都有学习成本,有些系统不支持SQL全集。
  • 一些底层组件性能查,可用性和扩展性无法满足需求。

Procella的特点

为了解决上述挑战,Youtube引入了Procella, Procella有如下特点:

  1. 支持标准SQL,并且做了一些扩展,支持估算函数、嵌套类型、UDF等。
  2. 高扩展性,计算和存储解耦,两者都可以无限扩展。
  3. 高性能,引用最新的计算计算,支持百万qps,毫秒级别响应。
  4. 数据freshness,支持batch和stream两种写入模式,原生支持lambda架构。

Procella的性能

image.png

Procella 15亿查询/天,一条扫描8E16行数据/天,返回1000亿行/天

image.png

延时的中位数是25毫秒,p999为 2.8秒,也就是说99.9%的查询在2.8秒内查询完成。

在毫秒级别晚上大规模数据的计算,Procella是如何做到的呢?下文将来揭秘。

Procella的架构

image.png

Procella采用计算和存储解耦的架构:

  • 数据存储在分布式文件系统Colossus上,类似hdfs/盘古。

  • 计算任务跑在由borg维护的机器上。

数据写入

Procella数据写入有两条链路:

  • 右侧的realtime数据,写入Ingestion Server,Ingestion Server把数据存储在Collossus上,与此同时,会把数据写一份到DataServer上(为啥呢? DataServer是负责计算的,发送到DS是为了做缓存,更有效的利用内存数据,因为Youbute的查询大部分依赖最近的数据,下文在查询加速上会提到该点)
  • 批量导入节点,采用离线的方式,离线通过MapReduce任务生成满足格式要求的文件,把文件写到Colossus上,然后再把文件信息注册到RegistrationServer上,这样避免了突然导入大量数据对系统的负载。

数据模型

Procella的数据分成两部分,分别是Data和metadata。

Procella采用的是表模型,存储格式是Artus的列式格式,数据存储在分布式文件系统Colossus。计算由DataServer完成,由于计算和存储是解耦的,所以同一份数据可以由多个DataServer来做计算。

MetaData包含了用于辅助计算的一些结构,称为secondary structure。包括zonemap(min,max等信息), bitmap(倒排索引), bloom filter, 分区, 排序键等。 metadata从列存储的文件头中读取,或者ds在计算query时生成。 metadata和data存储位置不同,是存储在bigtable或spanner上的。

通过DDL的形式管理表,DDL发送到Root Server(query入口),管理表的机制有分区、排序、constraint, 写入方式等。对于实时表,还支持生命周期管理,下采样,compact。

在compaction阶段,允许通过使用user defined sql 来处理数据,例如过滤,聚合,生命周期管理等。相当于在compaction阶段来做etl。

查询

好了,到了最关键的查询阶段, 首先看一下查询的流程。

在上文的架构图中,可以看到Procella的查询流程。

  1. client发起query,到达RootServer。
  2. RootServer解析sql, 优化plan
  3. 分配计算任务到DataServer上。
  4. DataServer读取数据(大部分在内存中, 少量读远程分布式存储),计算。
  5. 最终结果通过。

在整个查询,例如语法解析,plan 优化,计算,都是常见的流程。 而Procella的亮点在于做了相当多的优化手段,以加速计算,实现毫秒级别的响应。下文介绍具体有哪些优化手段。

查询优化

DataServer端的优化

  1. 读取数据时,数据有三种来源,local mem,缓存在本地内存中的数据;remote file,存储在分布式存储系统上的文件,remote memory,通过RDMA技术读取远程DataServer内存中的内容。RDMA 技术可以比RPC/http技术更快的从其他机器内存中读取数据。
  2. 尽可能的把计算任务下推到数据节点,例如filter,agg,project,join,能下推到最底层就下推到最底层,这样可以最大限度的减少数据的读取量,以及计算量。
  3. 在计算时,以encoding-native的方式读取,什么是encoding-native呢?就是算法感知编码格式,例如dictionary编码,在filter时,先从字典中filter出来要的key id,再到数据编码中找对应的id。
  4. 数据交换用stubby(一种grpc协议), shuffle用RDMA协议。

Cache

上文的架构图中提到,Procella是一种计算和存储分离的架构,这种数据和计算分离的方式,虽然能带来规模上的扩展性,但是也给单次查询带来了overhead。Procella时怎么做的呢?Procella通过大量的使用缓存来达到加速的目的,避免从远程读取数据。

  1. file metadata cache:数据存储在分布式存储上,每次读取文件,首先要向master获取文件真实存储的机器,然后再取对应的机器上读取数据。每次调用必然带来开销,因此在内存中维护了block -> file的handle信息。这样就比不要每次都从远程读取了,节省了rpc的开销。
  2. header cache, 列存文件的header中记录了文件的meta,每次读取这些meta要打开文件。IO的开销很大,因此在内存中采用lru来缓存这些header。
  3. Data Cache, 部分数据缓存在内存中,以加速计算,前文提到,实时数据流会有一份发送到内存中。
    1. 在内存中的数据格式,和磁盘上是一样的。这意味着,数据以encoding的模式放在内存中,不会反序列化,在计算时也直接操作encoded数据。这样节省了反序列化的开销。
    2. 一些计算复杂的output,也会缓存下来,还有一些metadata,例如bloom filter也会缓存下来。
  4. metadata的缓存,上文提到的各种secondary structure,都会缓存在内存中。
  5. 亲缘调度,由于大量的使用了缓存,也就意味着,在调度时,跟这份数据相关的,都需要尽量调度到同一个机器上,避免调度上颠簸,提高缓存命中率。
  6. 由于缓存技术以及很高的缓存命中率,使得procella在使用上彻底变成了一个内存数据,2%的数据保存在内存中,换取了99%的file handle命中率和90%的data 命中率。当然这个命中率可能是跟youtube特有的使用模式相关的,在youtube有大量的计算最新数据的需求。
  7. 对于metadata的缓存,从存储load到内存时,会编码成encoding模式,减少内存的使用率。

特有的列式格式Artus,启发式的encoding

youtube在设计列式存储时,需求有两个,分别是

  1. lookup,随机seek某些行数据。
  2. range scan, 扫描大块的数据。

存储上采用各种encoding 模式,不用压缩数据。这意味着读取时,也不用解压缩。
同时在计算时,直接在encoding模式智商进行,避免了反序列化。
encoding之后的数据大小和压缩后的大小基本接近,也就是说在存储空间上不是问题。

procella采用启发式的编码方式,根据数据特征进行特殊编码

  1. 在编码时进行多次扫描数据,第一次扫描采集轻量级信息,例如distinct value,min,max, sort order, 根据这些信息决定最佳的encoding模式。
  2. encoding模式包括dictionary encoding, rle, delta等。
  3. 如何动态的选择编码格式呢? 根据上边采集的信息,同时也会评估每一种encoding的压缩率和速度,决定最终的编码方式。
  4. 在查询时,可以实现O(logn)级别的搜索,O(1)seek到具体行。当然对于变长类型(例如字符串), 可以给每一段数据记录skip block信息,方便查找时快速跳过某些block。
  5. file和column header中记录了多种meta,例如sorting, min,max,encoding,bloom filter等用于剪枝,减少不必要的IO。例如通过pk来分文件,减少文件的读取次数。
  6. 通过倒排索引来加速in类计算。通常in类操作,可以变成join计算,或者逐个比较每个值,这种处理方式的性能是很差的,join计算是SQL里边的老大难题了,而逐个比较,时间复杂度在O(m*n)级别。而通过倒排索引,可以直接在O(1)级别知道是否存在对应的数据。
  7. 在数据摄入时,存储过程会针对计算做一些优化,比如分配partition, 部分aggregation预计算,sort key, 热数据放到ssd上等。

计算引擎

  • 静态codegen
    • 对于大多数的分布式计算系统,都喜欢用llvm来动态生成执行代码,以减少虚函数调用和分支跳转的开销。 但是llvm是有代价的,动态生成代码会带来毫秒级别的开销,对于Procella这样的系统,要求在几十毫秒内完成计算,动态codegen的overhead是不可接受的。同时由于他们的计算系统有个特点是大部分的query是固定的,比如前端显示的视频uv,pv等等。因此他们采用了静态编译的方式,通过c++ template来静态codegen,相当于枚举了所有的可能的operator。也算是一个特色吧。
    • 这种方式缺点也很明显,静态codegen需要在编译时刻把所有代码编译出来,估计编出来binary会非常大,放到内存里边也会占用大量内存,还需要防止换出到swap。
  • 以block为单元进行计算,block的大小不超过L1 cache,充分利用CPU的高速缓存。
  • 代码上保证能够自动的向量化处理(simd)。
  • 计算以encoding-native operator来计算。保证充分利用每一种encoding的计算优势,也避免数据反序列化的开销,同时也节省了内存使用。但是估计每个operator要是适配每一个encoding,代码工程量会不少。
  • 谓词下推,减少scan的数据量,算是古老的加速手段了。
  • metadata server 利用partitioning减少读取的文件,相当于谓词下推在partition上应用一层,过滤出只需要扫描的partition。
  • 利用metadata的计算。
    • 利用bloom filter & max/min信息 & 倒排索引以及其他的mta减少读盘计算。比如知道一个block的min/max之后,就可以根据filter条件确定是否要读取这个block。

Join

Join算是SQL里边的老大难题了,有很多算法来优化join的性能,比如RBO,CBO
Processla支持的join类型有

  • broadcastjoin,广播式join,在大表join小表的场景,把小表复制到大表所在的每一个机器上,这样避免大表的复制。
  • Co-partitioned join,左右表具有相同的partition信息,那么只把partition相同的数据hash到相同的机器,避免全量数据计算hash和跨机器拷贝。
  • shuffle join, 左右表分别计算hash,hash值相同的数据shuffle到同一个机器上进行join,算是最慢的
  • Pipelined join,如果右表是一个复杂的子查询,但是结果集合很小,那么倾向于先算出右表的结果,然后再按照broad cast join的方式。
  • Remote Lookup ,如果dim table比较大,这个时候就不适合hash或者broadcast了,由于procella的存储格式支持高效的lookup,O(1)或O(logn)级别,因此可以通过rpc发起远程的lookup来完成join。

长尾query

在SQL/MapReduce任务中,最常见的一个难题是长尾query,由于局部计算过热导致影响整体的查询延时。

Procella提出了这些优化:

  • Backup 策略
    • 对于每一个data server上的计算都会统计其响应延时,并且汇总成分位,对于远大于均值的请求,认为是利群点,这个时候,启动备份的data server来重新计算这个任务。
  • 限制发送到同一个ds的请求速度,前文讲过,processla是计算存储解耦的架构,但是为了充分利用缓存,又倾向于把同一份数据的计算调度到同一个机器上,因此这台机器又会成为热点。因此需要对发送到同一个ds的请求做资源限制。
  • Smaller query具有高的优先级。 smaller query可以快速跑完,让出资源,因此procella倾向于让小query先跑。这和presto是相反的逻辑。presto倾向于让大query先跑,否则大query就饿死了。

Intermediate Merging

在merge节点,有些merge操作是单点的,因此会成为计算的瓶颈点。Procella采集一些统计信息,如果发现merge节点一段时间还未完成计算,那么就开启一些前置的线程进行分布式计算。预聚合的结果发送给merge算子。前置线程是在同一台机器上的,也避免了数据交换。

Query 优化器

静态优化器

静态优化器是指在plan阶段执行的优化逻辑,包括RBO,CBO。procella支持了RBO, filter pushdown, subquery decorrelation, constant folding等。

启发式优化器

Procella的一个特点就在于这个启发式的优化器,相对静态优化器而言,启发式优化器是在运行时执行的。启发式优化器通过采集计算的状态统计,例如统计数据,基数,distinct count,分位信息等。启发式优化器甚至可以在执行时修改物理执行计划。我们知道通常而言,SQL在plan阶段生成逻辑执行计划,经过优化后,再生成物理执行计划,物理执行计划下发到机器上后,就不再修改了。Procella做到了动态的修改plan。

主要又这几个优化点:

  1. 决定reduce的个数,例如,计算group by时,下游分配多少个节点是跟group by的key的基数相关的,因此可以统计基数信息,动态决定分配多少个reduce节点,甚至可以运行时取消部分reduce任务。
  2. 启发式agg:正式运行之前,在一份小的数据聚合上运行一下,获得结果和统计数据,评估输出的行数,决定分配多少个reduce节点。
  3. 启发是join:也是根据采样的数据评估哪种join方式比较合适。
  4. 启发式sort:sort避免不了全局排序,在全局做排序是非常消耗内存和CPU的事情,procella采集数据的分布信息,把全量数据分配到n个桶里边,Map阶段按照分位shuffle到下游节点,实现了桶排序。

当然启发式优化器是有代价的,需要先在采样的数据上跑一次,才能决定正式的执行计划。因此会带来一些overhead。procella提供了开关,可以关闭启发式优化器。不过对于用户而言,是非常乐意通过少量的overhead来换取很大的性能提升的。

Youtube内嵌statistics

内嵌指标就是我们通常在视频网站上看到的一些指标,比如视频多少人观看,多少人点赞等等。

针对这部分计算的需求有:

  1. 数据更新快。
  2. 请求的qps高。
  3. 查询要求毫秒级别。

为了应对上述需求,procella做了以下优化:

  1. 新数据写入存储的同时,也会写入ds内存中,以保证查询时命中内存。
  2. MDS作为ds的模块,以减少rpc调用。因为获取一些meta信息会向mds请求。而rpc的开销至少也是毫秒级别。
  3. metadata 预加载 & 异步增量同步,保证meta数据都在内存中。
  4. plan缓存在内存中。通常词法/语法解析,以及各种plan优化也会带来开销。针对内嵌场景,大量的query都是重复的,因此缓存下来可以节省不少CPU和延时。
  5. RPC合并发送。不同的server之间交互的时候,把rpc合并起来发送,以减少网络开销。
  6. 监控RS,DS的错误率,一旦达到阈值,把机器下线。
  7. 禁用启发式优化器,前边提到启发式优化器会带来一些开销,针对内嵌指标场景其实是没必要的。

总结

总结下来,procella做了很多方面的优化,以保证计算在毫秒级别完成,总体而言,这些优化手段可以归结为:

  1. 通过metadata减少读取磁盘的次数和大小。
  2. 通过encoding-native的方式让计算靠近数据,减少反序列化开销,也让encoding加速计算。
  3. 大量内存缓存的使用,使得procella完全变成了一个内存计算引擎。
  4. 启发式的优化器,动态优化执行计划。

参考资料

procella 论文

Presto Coordinator分布式改造

背景

自从上线SQL功能以来,经过两年的时间,随着sls业务的不断增长,每天处理1亿次query,扫描1000,000,000,000,000行日志(没错1000万亿行日志)。业务的增长也给系统带来无形的压力,如何保障在低延时的前提下提供这么大的负载是一个巨大的挑战。

挑战之一就是单master架构。在之前的文章中,我介绍过了presto的架构。coordinator是presto中负责query解析,任务调度,结果汇总的,集群监控的节点。其他的worker节点只负责接收单个task进行计算即可。coordinator的任务可谓是多而广,在整个集群中起到了不可替代的作用。一旦该节点出问题,整个集群不可用。

上图为集群平均cpu,下图第一个为coordinator的cpu,可见coordinator的负载一直处于高位,且明显高于普通的worker。
image.png

随着业务的上升,coordinator的几点瓶颈愈发凸显:

  1. 负责query的解析和任务分发,负载比较重,且随着业务增加,负载线性增加。而单机的性能有上限,无法横向扩展。
  2. 单coordinator节点无容灾,一旦宕机,恢复时间比较长。

为了解决以上问题,我下决心把coordinator改造成分布式。

coordinator主要工作内容

coordinator主要有几个功能:

  1. query的解析,任务调度,任务监控。
  2. 集群内存监控。
  3. 机器状态监控。

query任务流

image.png

query从提交到coordinator,到query执行完成,由coordinator负责管理,一个qeuery的具体流程包括:

  1. 队列排队,获取运行许可。
  2. 词法解析和语法解析,把SQL转换成抽象语法树。
  3. 语法优化,语法检查。利用预定义规则对语法数进行优化,例如剪枝优化。还有把单机plan改造成并行式plan。
  4. 对plan划分出不同的stage,并且按照并行度的需求,为每个stage生成多个task,每个task指定一台机器。形成最终的逻辑执行计划。
  5. stage代表的任务的上下游,task代表一个stage内的并行执行。一个stage内的task执行相同的任务,只是接收的数据不一样。
  6. 每个task调度到一个worker上,并且coordinator为每个task启动一个任务追踪,即TaskInfoFetcher,周期性轮训task的状态。
  7. 当轮训到task的状态发生改变时,回调改变stage-> query的状态,最终直到query的所有task都结束。

整个任务流中,像query解析,调度,监控等都可以在任意节点执行,不需要非得在全局唯一节点执行。 只有queue涉及到全局队列,需要在单点执行。

集群控制流

Presto集群控制只要有两方面内容:

  1. 集群状态控制
    1. 检查每台机器的状态,判断是ALIVE、SHUTDOWN中的哪个状态。
    2. 检查每台机器的各个内存池分配状态,终止超限query,以及提升大query的优先级。
  2. Query状态控制
    1. 轮询每个task的状态,通过状态机管理query状态。

image.png

集群内存管理

Presto采用逻辑的内存池,来管理不同类型的内存需求。

Presto把整个内存划分成三个内存池,分别是System Pool ,Reserved Pool, General Pool。

  • System Pool 是用来保留给系统使用的,默认为40%的内存空间留给系统使用。
  • Reserved Pool和General Pool 是用来分配query运行时内存的。
  • 其中大部分的query使用general Pool。 而最大的一个query,使用Reserved Pool, 所以Reserved Pool的空间等同于一个query在一个机器上运行使用的最大空间大小,默认是10%的空间。
  • General则享有除了System Pool和General Pool之外的其他内存空间。

Presto内存管理,分两部分:

  • query内存管理
    • query划分成很多task, 每个task会有一个线程循环获取task的状态,包括task所用内存。汇总成query所用内存。
    • 如果query的汇总内存超过一定大小,则强制终止该query。
  • 机器内存管理
    • coordinator有一个线程,定时的轮训每台机器,查看当前的机器内存状态。
    • 当query内存和机器内存汇总之后,coordinator会挑选出一个内存使用最大的query,分配给Reserved Pool。

内存管理是由coordinator来管理的, coordinator每秒钟做一次判断,指定某个query在所有的机器上都能使用reserved 内存。那么问题来了,如果某台机器上,,没有运行该query,那岂不是该机器预留的内存浪费了?为什么不在单台机器上挑出来一个最大的task执行。原因还是死锁,假如query,在其他机器上享有reserved内存,很快执行结束。但是在某一台机器上不是最大的task,一直得不到运行,导致该query无法结束。

Coordinator分布式改造

从上边的介绍中,我们可以看出,只能在集群唯一节点进行的操作有:

  1. 全局队列。
  2. 机器状态监控。
  3. 集群内存状态管理。

二coordinator上负载最重的操作是query的task状态管理。一个query会分配若干个task,这个膨胀是很大的。每个task都要轮询状态信息。这部分逻辑是可以分布式搞得,不必放在全局唯一节点进行。

image.png

coordinator改造方案,就是把一些不是全局性的操作,都拆分到不同节点去做。而把全局性的操作放到一个节点。所以原来的coordinator会变成两个角色,watch tower和新coordinator:

image.png

Watch tower:

只负责集群的监控,不负责跟query相关的工作。

  • 集群内存管理,周期性轮询每台机器的内存分配状态,把最大的query提升导reserved memory pool。
  • 机器状态管理,周期性轮询每台机器状态,是否ALIVE、SHUTDOWN。

由于coordinator需要知道每台机器的状态、内存,因此在coordinate向每台机器同步信息时,会同时把相关状态同步给coordinator。

image.png

Coordinator:

coordinator只专心做query相关的工作,接受请求,解析、调度、监控所有的task。

队列处理

presto的队列模型,可以设置query运行的最大并发度和最大排队数。这些参数可以时用户级别,也可以是整个集群的级别。

在官方的presto中,由于只有一个coordinator节点,可以很容易在单点控制整个集群的执行并发度。分拆成多个coordinator并发执行之后,控制整个集群的并发度变得比较困难。因而我们选择在单个coordinator上控制运行在本机的并发度。

客户端

image.png

改造成并发coordinator后,client端从Discovery获取到所有的coordinator(coordiantor向discovery回报心跳时,增加了标识coordinator)。并且把coordinator按照地址排序,保证所有的client获取到的都是同样的顺序。

当接收到SQL请求后,client按照user的hash值,选择一个coordinator发送。这样就可以保证同一个用户的SQL只发送导一个coordinator,从而控制并发度。当然如果coordinator发生扩容缩容,在短时间内,coordinate的顺序会发生改变,影响队列。

总结

在完成分布式改造后, coordinator可以做到水平扩展,解决了coordinator的单点问题。改造后的效果:

image.png

CPU消耗比较高的Coordinator经过分布式改造后,有3个coordinator,可以看出每个coordinator的CPU仍然很高。这是由于coordinator上负责整个query的运行监控,如果一个query分配的task非常多,那么是需要启动很多后台任务轮询task状态,这对coordinator的负载压力是很大的。虽然我们可以做到水平扩展coordinator,但是由于单个coordinator的最大并发有限,也影响了整个集群的性能。接下来我们也会继续对这部分逻辑进行改造,期望能够优化coordinator的性能。

Presto内存泄露问题调查

Presto内存泄露问题调查

问题背景:

sls的线上流量越来越大,S1几乎增长了100%。在杭州region,每隔一段时间,一部分机器Presto就会开始频繁的Full GC,重启后稳定一段时间,然后过一段时间又开始频繁Full GC。Full GC达到一定次数的时候,就发生OOM,进程直接crash。由于Full GC时间长,影响线上的可用性,因此开始投入精力进行调查。

查看GC 文件

当频繁发生GC时,会在gc文件中打出下边的内容,表示GC发生的类型(Full GC(Allocation Failure)), 在发生full gc之前,堆用了19.9G;GC后,还是19.8G,也就是说GC发生后,只释放了0.1G的空间。

1
2
3
4
2019-08-28T15:58:46.140+0800: 2414974.134: [Full GC (Allocation Failure)  19G->19G(20G), 25.4147570 secs]   
[Eden: 0.0B(1024.0M)->0.0B(1024.0M) Survivors: 0.0B->0.0B Heap: 19.9G(20.0G)->19.8G(20.0G)], [Metaspace: 13
9463K->139463K(1267712K)]
[Times: user=40.66 sys=0.00, real=25.42 secs]

由于没有足够的可用内存,于是Presto很快再次Full GC,知道最终一点内存都没有了,发生了OOM 异常,程序crash。

注:GC文件怎么打:

1
2
3
4
5
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=5M
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps

监控内存的变化趋势

从外部看进程的内存使用率,如下图所示,是看不出任何信息的。因为java进程内部维护了内存的分配。
image.png

于是我通过jmx监控去查看Presto进程内部的内存分布。 java进程把内存分成几个部分:年轻区,幸存区,老年区。不同的内存依据生命周期的长短放在不同的区域内。通常,内存是在年轻代分配,当年龄达到一定大小时,会移动老年区(这里简化了模型,实际还有幸存区的作用)。

在jmx中,存在如下的监控项目,分别表示老年代(Old Gen)和年轻代(Eden Space)已经使用的内存。通过jmx接口周期性的读取这两个属性,接入到神农监控。

1
2
java.lang:type=MemoryPool,name=G1 Old Gen/Usage/used (Long) = 1182129304
java.lang:type=MemoryPool,name=G1 Eden Space/Usage/used (Long) = 251658240

我们把时间周期拉长了看,可以看到老年代(红色线)的内存趋势一致处于缓慢上涨状态。

image.png

通过智能算法查找原因

由于内存趋势的规律性,首先怀疑的是由于某些用户的访问上涨,因此通过相似性算法,查找哪个用户的访问趋势和这个内存曲线相似。很遗憾的是,没有找到类似的用户趋势。

接下来日志聚类查看有哪些日志pattern,日志聚类能够把相似日志聚合成一个pattern,帮助我们快速浏览大量的日志。 我们注意到了下图中的这个pattern:这个pattern虽然不能说明和内存问题直接相关,但是这个pattern给我们留下了印象,接下来的一系列调查也证明内存问题和这个pattern相关。

image.png

通过heapdump查看内存泄露的原因

智能算法没有找到特定的用户,没有办法,只能老老实实的去分析内存。

java 自带的工具jmap,可以打印出当前内存的object分布,也可以把内存写到磁盘上。

1
2
打印内存直方图,:live表示在打印前执行一次full gc
jmap -histo:live `pid of java` > /tmp/jmap00
1
2
dump堆文件
jmap -dump:format=b,file=heap.dump `pid of java`

直方图能够看到内存的分配,但是信息太粗,只能到object级别,看不出具体的对象,也看不到reference。

dump heap会让程序停顿,为了避免对线上访问造成影响,先把这台机器的心跳摘掉,移除流量。再执行上述的dump命令。 获取到dump文件后,可以用jhat打开,也可以用jvisualVM 打开。不过用jhat打开时,遇到OOM的问题。最终用jvisualVM打开后,看到如下的大量对象PendingRead。再查看reference,可以确定是SqlTask 下的内存泄露了。

从reference中可以看到一个taskId,一个偶然的机会,在presto的http-request.log文件中,找到了线索,

image.png

通过sls查看presto的http请求。最近15分钟,一些task持续运行了大约15分钟。从task命名上可以看到,22号的task仍然在执行。

通过task的api,获取task当前的状态:可以发现task仍然处于running状态。

1
2
curl 11.194.214.145:10008/v1/task/20190816_183242_49436_pwzkn.2.5
{"taskStatus":{"taskId":"20190816_183242_49436_pwzkn.2.5","taskInstanceId":"1ae02d83-f024-4d45-997b-96dea741b04e","version":1768082,"state":"RUNNING","self":"http://h51c07359.cloud.et91:10008/v1/task/20190816_183242_49436_pwzkn.2.5","failures":[],"queuedPartitionedDrivers":0,"runningPartitionedDrivers":0,"memoryReservation":"0B"},"lastHeartbeat":"2019-08-27T01:34:56.905Z","outputBuffers":{"type":"UNINITIALIZED","state":"OPEN","canAddBuffers":true,"canAddPages":true,"totalBufferedBytes":0,"totalBufferedPages":0,"totalRowsSent":0,"totalPagesSent":0,"buffers":[]},"noMoreSplits":[],"stats":{"createTime":"2019-08-24T12:42:07.748Z","elapsedTime":"0.00ms","queuedTime":"0.00ms","totalDrivers":0,"queuedDrivers":0,"queuedPartitionedDrivers":0,"runningDrivers":0,"runningPartitionedDrivers":0,"completedDrivers":0,"cumulativeMemory":0.0,"memoryReservation":"0B","systemMemoryReservation":"0B","totalScheduledTime":"0.00ms","totalCpuTime":"0.00ms","totalUserTime":"0.00ms","totalBlockedTime":"0.00ms","fullyBlocked":false,"blockedReasons":[],"rawInputDataSize":"0B","rawInputPositions":0,"processedInputDataSize":"0B","processedInputPositions":0,"outputDataSize":"0B","outputPositions":0,"pipelines":[]},"needsPlan":true,"complete":false}

从taskid获取queryid : 20190816_183242_49436_pwzkn,从运行日志中获取该query的执行结果:发现query在16号的时候发生了运行时错误。

image.png

既然整个query已经确认fail了,但是task处于running状态,那么我可以强制通过task的delete api,把任务给清理掉。在清理过后,task的状态变成了ABORT状态,表示任务失败了。但是调用过后, task的请求并未终止,仍然在继续执行,过了大约15分钟,再次去查看task的状态,又变成了RUNNING状态。

内存泄露的原因

已经从上述调查中,知道了内存泄露的位置。通过阅读代码和推演,大致理清楚了内存泄露的原因。

query执行的流程如下:在正常情况下,coordinator调度split到每个机器上,生成对应的task, 然后下游的task生成轮训任务,向上游task读取计算结果。如果任何一个task发生了错误,那么coordinator会把所有的task终止掉。

image.png

但是在某些情况下,某一台机器发生了调度延迟,Task 2首先调度,并且开始了计算,但是由于遇到了计算错误,于是终止了task。 接下来这个时候task1才开始调度,然后生成了向task2轮训的任务。由于task2是异常终止的,内存中的标志位都是没有清空,导致认为task2还在读数据,因此轮训任务一直终止不了。每次轮训,都生成一个PendingRead放到内存中。日积月累,就造成了内存泄露。

验证内存泄露的原因

计算task一直不能终止,那么如果我强行通过API DELETE掉task,内存理论上可以被删除掉。

通过curl删除task的http请求,

1
curl 11.223.196.92:10008/v1/task/20190822_163910_94499_pwzkn.2.4 -X DELETE

删除后的状态

1
2
3
4
5
6
7
8
9
{"taskStatus":{"taskId":"20190822_163910_94499_pwzkn.2.4","taskInstanceId":"71f71c85-5073-4cf9-852b-16bf9c49496f","version":3239316,"state":"ABORTED","self":"
http://g24h09288.cloud.et91:10008/v1/task/20190822_163910_94499_pwzkn.2.4","failures":[],"queuedPartitionedDrivers":0,"runningPartitionedDrivers":0,"memoryRes
ervation":"0B"},"lastHeartbeat":"2019-08-27T01:32:46.223Z","outputBuffers":{"type":"UNINITIALIZED","state":"OPEN","canAddBuffers":true,"canAddPages":true,"tot
alBufferedBytes":0,"totalBufferedPages":0,"totalRowsSent":0,"totalPagesSent":0,"buffers":[]},"noMoreSplits":[],"stats":{"createTime":"2019-08-23T02:17:55.766Z
","endTime":"2019-08-27T01:32:47.296Z","elapsedTime":"0.00ms","queuedTime":"0.00ms","totalDrivers":0,"queuedDrivers":0,"queuedPartitionedDrivers":0,"runningDr
ivers":0,"runningPartitionedDrivers":0,"completedDrivers":0,"cumulativeMemory":0.0,"memoryReservation":"0B","systemMemoryReservation":"0B","totalScheduledTime
":"0.00ms","totalCpuTime":"0.00ms","totalUserTime":"0.00ms","totalBlockedTime":"0.00ms","fullyBlocked":false,"blockedReasons":[],"rawInputDataSize":"0B","rawI
nputPositions":0,"processedInputDataSize":"0B","processedInputPositions":0,"outputDataSize":"0B","outputPositions":0,"pipelines":[]},"needsPlan":true,"complet
e":true}

删除后等待15分钟,task会从内存中删除:可以看到内存发生了大幅下跌。

image.png

过几天后再去看内存,又在持续的增长,这是因为我们只是清理了内存,但是轮训任务并没有被清理掉.

image.png

构造case复现

从采样到的几个内存泄漏点,我们可以看到明显的特征,就是query遇到错误的数据,执行失败,然后才有概率遇到调度异常。因此我们可以构造一些非法数据,让source节点快速的fail掉。通过检查http-request.log中task的运行时长,可以确认是否发生了内存泄露。

修复内存泄露

从上述验证过程也可以看出,要想修复内存泄露,必须让泄露的轮训任务终止掉。有几种修复方案:

  1. 上游判断,如果stat是terminal(结束或失败)状态,返回空结果和结束标志。
  2. 下游判断,如果query是结束状态,那么不生成轮训task。
  3. 下游判断,如果task处于close状态,那么生成轮训task,但是进入清理阶段,清理掉上游的内存后退出。

综合考虑每种方案,以及测试结果,最终采用了第三种方案,可以把内存清理干净,避免一些遗留问题。

参考资料

java垃圾回收完全手册

Presto coordinator的CPU持续上涨,原因竟然是这样

问题背景

之前介绍过presto的架构, coordinator是Presto架构中负责调度的master节点。在实际部署中,为了减少该节点的负载,指定node-scheduler.include-coordinator=false,避免把计算任务调度到coordinator节点上。

由于Presto进程是常驻进程,而且需要实时的提供在线服务。通常只有在需要升级时,才会通过热升级手段重启进程。于是我们在一个大集群上发现了这个现象,coordinator的CPU随着时间不短增长。最高达到了3000%+ 。见下图是连续一个月的持续上涨。
image.png

如果任由CPU这样上涨下去,整个集群将不可控,也会影响计算能力。于是我开始了一系列的调查。

是json的原因吗?

首先,通过jstack命令打印Presto进程的栈。在栈中最常见的调用栈,是json的反序列化操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.parseMediumName2(UTF8StreamJsonParser.java:1836)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.parseMediumName(UTF8StreamJsonParser.java:1793)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1728)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:776)
at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:389)
at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1194)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:314)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:148)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3789)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2950)
at io.airlift.json.JsonCodec.fromJson(JsonCodec.java:198)
at io.airlift.http.client.FullJsonResponseHandler$JsonResponse.<init>(FullJsonResponseHandler.java:118)
at io.airlift.http.client.FullJsonResponseHandler.handle(FullJsonResponseHandler.java:68)
at io.airlift.http.client.FullJsonResponseHandler.handle(FullJsonResponseHandler.java:37)
at io.airlift.http.client.jetty.JettyHttpClient$JettyResponseFuture.processResponse(JettyHttpClient.java:857)
at io.airlift.http.client.jetty.JettyHttpClient$JettyResponseFuture.completed(JettyHttpClient.java:834)
at io.airlift.http.client.jetty.JettyHttpClient$BufferingResponseListener.onComplete(JettyHttpClient.java:1119)

这个栈是一个http请求的callback,在callback中把response把json反序列化成对象。http的callback线程时包括了200个线程,而我经常能在stack中看到超过180个线程在做json反序列化。从栈总看不出是属于什么http请求。但是我们能从代码中找到coordinator发出的这几类请求。

image.png

  1. task任务状态的fetcher,循环的获取task的状态,直到task结束。这其中其中有两类fetch操作:
    1. 一个获取taskStatus,只包含task状态,循环获取,前一次获取结束后,立马启动下一次
    2. 一个获取taskInfo。包含task的详细信息,每3秒获取一次。
  2. Memory管理,coordinator定期向每台机器获取机器的内存使用状况。
  3. fail detect 请求有两类:
    1. 一类是head请求,没有response
    2. 另一类是/v1/info/state请求,是worker的状态,返回类似于”ACTIVE”或者”SHUTDOWN”之类的状态。

由于内存同步和心跳检查是跟机器数据相关,而task同步则跟query数目相关。于是我怀疑是同时运行的query太多,有1000多个query在同时运行,每个query生成上百个task。 coordinator每分钟要发起几十万次读取task状态的请求。jackson库的反序列化效率不高,导致coordinator的CPU很高。 这可以解释CPU高的原因,但不能解释CPU持续上涨的原因。不管怎么样,我还是决定把jackson改成性能更佳的fastjson。

改成fastjson上线后,过段时间发现,CPU仍然在缓慢的持续上涨,不得不再重新寻找新的证据。

是不停的分配新的线程吗?

我们在jstack栈中,能清楚地看到很多线程同时在做json反序列化操作。证明是和json反序列化相关的操作。于是再去阅读源码。发现task status fetcher使用的线程池是一个可变的线程池newCachedThreadPool

具体操作是:任何请求,都是放入队列中,然后由线程池处理。如果队列满,则线程池会新分配一个线程。

于是我怀疑是负载太高,导致不停的分配新的线程,于是CPU越来越高。怎么解决这个问题?我改造了coordinator,由原来的单节点,变成了多节点,以均分coordinator的压力。

把coordinator改造成分布式能解决问题吗?

Presto原生的coordinator由于依赖单节点进行内存监控,如果强行部署成多coordinator的话,会造成内存管理的混乱,有可能让某些大query死锁。因此改造成分布式还颇费一番功夫。我会另起一个话题,讲述如何把coordinator改造成分布式。

简单的说,改造完成后,有3个coordinator。原来的coordinator,负责内存管理和worker failover管理,同时处理1/3的query。而另外两个coordinator,只分别负责管理另外1/3的query。同时把内存管理的心跳间隔调整到了5s。

过了一段时间,发现了新的情况。中心coordinator的CPU仍然在缓慢持续上涨,但是CPU并不是一直很高,而是每隔5s飙升一次。新增的两个coordinator则没有变化。于是我从所有的配置中找跟5s相关的参数,从这里开始意思到似乎和内存管理的心跳有关。

从jstack能看到跟json反序列化有关, 内存管理的http response也需要反序列化json。虽然之前把jackson改造成了fastjson,但只是改造了task心跳部分,没有改造内存管理部分。

coordinator每5s向每个worker发送一次请求,获取worker的内存使用,这个请求量是恒定的。理论上,除非我们增加机器,负载才会增加。于是我去检查了presto的http-request.log。 我把日志采集到阿里云日志服务,通过日志分析,检查http请求的变化趋势。

1
v1/memory | select date_trunc('day',__time__) as t, count(1) as pv, sum(response_size) as res from log group by t order by t

image.png
左Y轴是PV,右Y轴是response size的一天累加值。我们可以看到PV基本上没有大的变化。而response size则不断增长。20天内从最低7.6G/day增长到了237G/day。平均每个response达到149k。 这个截图是我写文章时截取的。在当时调查问题时,看到了600k的response。

response增加 -> json反序列化负载增加 -> CPU不断增加。看起来符合逻辑,接下来只有去找为什么response不断增加了。

tcp抓包

为了查看response这么大的原因,我用tcpdump抓取presto的流量。

1
sudo tcpdump  port 10000 -s 0 -w /tmp/netstat 

在/v1/memory的响应结果中,会包含general , reserved, 和system 三个pool分别总的大小以及使用大小,和目前占用对应内存池的queryId。结构是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"totalNodeMemory": "12884901888B",
"pools": {
"reserved": {
"maxBytes": 3221225472,
"freeBytes": 3221225472,
"queryMemoryReservations": {

}
},
"general": {
"maxBytes": 9663676416,
"freeBytes": 9663676416,
"queryMemoryReservations": {

}
},
"system": {
"maxBytes": 8589934592,
"freeBytes": 8589934592,
"queryMemoryReservations": {
"20190509_113603_03920_25e2h",
"20190509_113608_02758_rdy5b",
...
...
}
}
}
}

其中system -> queryMemoryReservations 这个节点下的内容非常的多,甚至包含几天前的queryId。这意味着某些query占用了system内存池,结束后没有正常释放。接下来就需要查找什么地方存在内存泄露。

查找内存泄漏点

query使用的内存,都会记录在context中,因此我专门写了个程序,向coordinator轮训已经完成的query,获取这些query的描述信息。查看到底是哪个环节的内存使用有问题。

1
2
3
4
5
6
7
"tasks" : [ {          
"stats" : {
"systemMemoryReservation" : "712B",
"pipelines" : [ {
"systemMemoryReservation" : "712B",
"operatorSummaries" : [ {
"systemMemoryReservation" : "0B",

抓取到的多个query的stat信息,都具备相同的特征:

  1. 某一个task的systemMemoryReservation 不为0。
  2. task的第一层pipeline不为0。
  3. pipeline的Operator都是ExchangeOperator和LocalExchangeSinkOperator,但是operator内部的systemMeoryReservation为0。

system内存是在计算过程中,使用的系统内存,例如两个worker之间传递数据,使用的就是system mem pool

Operator信息表明,这个节点是从前一层节点接收数据,放在内存中,供下层pipeline计算。

image.png

task -> pipeline -> driver -> operator 构成了一层层结构。由于数据显示pipeline这一层发生了泄露,我只好去看pipeline分配内存的逻辑。

image.png

Driver和ExchangeClient在分配内存时,都会调用Pipeline的内存分配逻辑。由于数据显示跟Exchange有关,于是我重点检查了这ExchangeOperator分配内存的逻辑,初看之下,每一层都能做到自平衡,也就是在某一层结束的时候,调用close,自己把尚未释放的内存释放掉。

但是,我也发现了在多线程场景下,由于执行时序不同,会导致出现内存泄露情况。

多线程时序问题

在ExchangeClient中,就存在多种场景。

第一种场景,ExchangeClient是一个生产者和消费者模型,后台线程从远端拿到数据后,由callback线程把数据放入队列,然后才分配内存。 消费者线程从队列中poll到数据后,再释放内存。

image.png

假如事件发生的顺序是这样的 P1 C1 C2 P2 , 那么会先释放内存,在C2释放内存时,由于出错而分配失败。接下来在P2再去分配内存就会出错。确保不会出错的顺序是把P1和P2对调顺序。

如果上边的假设成立,那么会在日志中看到tried to free more memory than is reserved的错误。没有从日志中发现上边的错误,于是这种假设被排除了。

第二种场景, C2在释放内存是,会先判断closed变量。如果生产者在C2之前就设置了closed变量,那么就不会进入释放的逻辑。我加了一些日志,证明发生内存泄露时,closed变量确实被设置了。

设置closed变量的方式如下代码,在代码中只设置了closed变量,而没有做内存清理:

1
2
3
if (!closed.get() && pageBuffer.peek() == NO_MORE_PAGES) {
closed.set(true);
}

由于我使用的presto代码比较老,是176版本,和master head版本进行对比,发现最新代码已经改成了close(), 调用close()函数,会提前清理内存,能够避免内存泄露的问题。
git blame的结果:

1
e3535bbfeca (Dain Sundstrom     2017-02-24 10:04:37 -0800 289)                 close();

根据commit id找到对应的信息:https://github.com/prestodb/presto/commit/e3535bbfeca

image.png

内存泄露的原因已经解决了,我用的版本已经太老了。但是由于我对presto内核做了很多改动,想要merge到HEAD版本恐怕要花费不少时间。

总结

在ExchangeClient中,向上游节点拉取数据时,有一定概率导致内存泄露,上游节点越多,概率越大。

当内存发生泄露时,内存池忠实的记录了每一个queryId。这导致随着时间推移,queryId的list越来越多。

coordinator会定时向每台机器获取内存池的使用情况。因此response越来越大,而json反序列化的CPU使用率越来越高,几乎占满了http callback的线程池。最终反映到监控图上,就是coordinator随着时间推移,CPU Usage直线上涨。

One More Thing

把ExchangeClient内存泄露的问题解决后,再去验证。由ExchangeClient造成的内存泄露已经解决了,但是发现ScanFilterAndProjectOperator在很小的场景下,会出现内存泄露。这是一个SourceOperator的派生类,会调用plugin实现的ConnectorSource,方便ConnectorSource使用系统内存。内存泄露的原因还待分析。

深入理解Presto(3):Presto内存管理

深入理解Presto(3):Presto内存管理

上一篇文章,我们讲了Presto的架构。Presto是一款内存计算型的引擎,所以对于内存管理必须做到精细,才能保证query有序、顺利的执行,部分发生饿死、死锁等情况。

内存池

Presto采用逻辑的内存池,来管理不同类型的内存需求。

Presto把整个内存划分成三个内存池,分别是System Pool ,Reserved Pool, General Pool。

image.png

  1. System Pool 是用来保留给系统使用的,默认为40%的内存空间留给系统使用。
  2. Reserved Pool和General Pool 是用来分配query运行时内存的。
  3. 其中大部分的query使用general Pool。 而最大的一个query,使用Reserved Pool, 所以Reserved Pool的空间等同于一个query在一个机器上运行使用的最大空间大小,默认是10%的空间。
  4. General则享有除了System Pool和General Pool之外的其他内存空间。

为什么要使用内存池

System Pool用于系统使用的内存,例如机器之间传递数据,在内存中会维护buffer,这部分内存挂载system名下。

那么,为什么需要保留区内存呢?并且保留区内存正好等于query在机器上使用的最大内存?

如果没有Reserved Pool, 那么当query非常多,并且把内存空间几乎快要占完的时候,某一个内存消耗比较大的query开始运行。但是这时候已经没有内存空间可供这个query运行了,这个query一直处于挂起状态,等待可用的内存。 但是其他的小内存query跑完后,又有新的小内存query加进来。由于小内存query占用内存小,很容易找到可用内存。 这种情况下,大内存query就一直挂起直到饿死。

所以为了防止出现这种饿死的情况,必须预留出来一块空间,共大内存query运行。 预留的空间大小等于query允许使用的最大内存。Presto每秒钟,挑出来一个内存占用最大的query,允许它使用reserved pool,避免一直没有可用内存供该query运行。

内存管理

image.png

Presto内存管理,分两部分:

  • query内存管理
    • query划分成很多task, 每个task会有一个线程循环获取task的状态,包括task所用内存。汇总成query所用内存。
    • 如果query的汇总内存超过一定大小,则强制终止该query。
  • 机器内存管理
    • coordinator有一个线程,定时的轮训每台机器,查看当前的机器内存状态。

当query内存和机器内存汇总之后,coordinator会挑选出一个内存使用最大的query,分配给Reserved Pool。

内存管理是由coordinator来管理的, coordinator每秒钟做一次判断,指定某个query在所有的机器上都能使用reserved 内存。那么问题来了,如果某台机器上,,没有运行该query,那岂不是该机器预留的内存浪费了?为什么不在单台机器上挑出来一个最大的task执行。原因还是死锁,假如query,在其他机器上享有reserved内存,很快执行结束。但是在某一台机器上不是最大的task,一直得不到运行,导致该query无法结束。

Presto入门: 配置第一个http connector

Presto入门: 配置第一个http connector

1. connector

在presto中,可以对接多种类型的数据源,今天以http 服务器数据为例,简单介绍如何接入presto。

2. 搭建http数据数据源

2.1 http数据源的schema

在http服务器上,提供一个文件,文件内容是数据源的格式。 一个文件是json格式,顶层是schema的名称,schema类似数据的database。schema之下是一个表的list。每张表要提供列的名称和类型,以及数据的地址,即http地址,见一个样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
{
"schema":[{
"name":"table1",
"columns":[
{
"name":"key1",
"type":"bigint"
},
{
"name":"key2",
"type":"varchar"
}
],
"sources":[
"http://localhost:9080/data.csv"
]
}
]
}

2.2 提供数据:

http数据是一个csv格式,例如上文提到的data.csv的内容是:

1
2
10,b
1,d

2.3 配置presto

接下来配置presto,使得presto知道http 数据源的存在,创建文件etc/catalog/http.properties ,在文件中指定schema的地址:

1
2
connector.name=example-http
metadata-uri=http://localhost:9080/schema.json

2.4 查看查询效果:

2.4.1 展示http catalog中的schema

1
2
3
4
5
6
7
8
9
10
11
presto> show schemas from http;
Schema
--------------------
information_schema
schema
(2 rows)

Query 20180510_030439_00002_58j4x, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [2 rows, 34B] [15 rows/s, 263B/s]

2.4.2 展示http catalog的schema库中的表内容

1
2
3
4
5
6
7
8
9
presto> show tables  from http.schema;
Table
--------
table1
(1 row)

Query 20180510_030453_00003_58j4x, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [1 rows, 22B] [4 rows/s, 108B/s]

2.4.3 展示表的格式

1
2
3
4
5
6
7
8
9
10
11
presto> describe  http.schema.table1;
Column | Type | Extra | Comment
--------+---------+-------+---------
key1 | bigint | |
key2 | varchar | |
(2 rows)

Query 20180510_030507_00004_58j4x, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [2 rows, 123B] [9 rows/s, 603B/s]

2.4.4 获取表的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
presto> select * from http.schema.table1;

Query 20180510_031258_00005_58j4x, FAILED, 1 node
Splits: 17 total, 0 done (0.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20180510_031258_00005_58j4x failed: For input string: "a"

presto> select * from http.schema.table1;
key1 | key2
------+------
10 | b
1 | d
(2 rows)

Query 20180510_031315_00006_58j4x, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [2 rows, 0B] [41 rows/s, 0B/s]


深入理解Presto(2) :Presto查询执行过程

深入理解Presto(2) :Presto查询执行过程

Presto查询执行过程

这里写图片描述

Presto包含三类角色,coordinator,discovery,worker。coordinator负责query的解析和调度。discovery负责集群的心跳和角色管理。worker负责执行计算。

presto-cli提交的查询,实际上是一个http POST请求。查询请求发送到coordinator后,经过词法解析和语法解析,生成抽象语法树,描述查询的执行。

执行计划编译器,会根据抽象语法树,层层展开,把语法树所表示的结构,转化成由单个操作所组成的树状的执行结构,称为逻辑执行计划。

原始的逻辑执行计划,直接表示用户所期望的操作,未必是性能最优的,在经过一系列性能优化和转写,以及分布式处理后,形成最终的逻辑执行计划。这时的逻辑执行计划,已经包含了map-reduce操作,以及跨机器传输中间计算结果操作。

scheduler从数据的meta上获取数据的分布,构造split,配合逻辑执行计划,把对应的执行计划调度到对应的worker上。

在worker上,逻辑执行计划生成物理执行计划,根据逻辑执行计划,会生成执行的字节码,以及operator列表。operator交由执行驱动来完成计算。

抽象语法树

由语法解析器根据SQL,解析生成的树状结构,描述SQL的执行过程。 在下文中,以SQLselect avg(response_size) as a , client_address from localfile.logs.http_request_log group by client_address order by a desc limit 10为例来描述。

抽象语法树数以Query为单位来描述查询,分层次表示不同层的子查询。每一层查询查询包含了几个关键因素:select, from,where,group by,having,order by,limit。其中,from可以是一个子查询,也可以是一张表。

一个典型的抽象语法树:

这里写图片描述

生成逻辑执行计划

抽象语法树树,描述的最原始的用户需求。抽象语法树描述的信息,执行效率上不是最优,执行操作也过于复杂。需要把抽象语法树转化成执行计划。执行计划分成两类,一类是逻辑执行计划,一类是物理执行计划。逻辑执行计划,以树状结构来描述执行,每个节点是最简单的操作。物理执行计划,根据逻辑执行计划生成字节码,交由驱动执行。

转写成逻辑执行计划的过程,包括转写和优化。把抽象语法树转写成由简单操作组成的结点树,然后把树中所有聚合计算节点转写成map-reduce形式。并且在map-reduce节点中间插入Exchange节点。然后,进行一系列优化,把一些能提前加速计算的节点下推,能合并的节点合并。

最后逻辑执行计划按照Exchange节点做划分,分成不同的段(fragament),表示不同阶段的的执行计划。在调度时,按照fragment调度。

SQLselect avg(response_size) as a , client_address from localfile.logs.http_request_log group by client_address order by a desc limit 10的逻辑执行计划:
这里写图片描述

从执行计划中可以看到,agg节点都是分成partial和final两步。

调度执行计划到机器上

调度涉及到两个问题,第一,某个fragment分配由哪些机器执行;第二,某个fragment的计算结果如何输出到下游fragment。

在调度时,需要为每一个fragment指定分配到哪些机器上。从调度上划分,fragment分三种类型

  • 一类是source类型由原始数据的存储位置决定fragment调度机器,有多少个source节点呢?connector会根据数据的meta,决定需要读取多少个split(分片) ,对于每一个source节点,分配一个split到一台机器上,如果在配置中指定了network-topology=flat,则尽量选择split所在的机器。
  • 一类是FIXED类型,主要用于纯计算节点,从集群中选择一台或多台机器分配给某个fragment。一般只有最终输出节点分配一个机器,中间的计算结果都要分配多台机器。分配的机器数由配置hash_partition_count决定。选择机器的方式是随机选择。
  • 一类是SINGLE类型,只有一台机器,主要用于汇总结果,随机选择一台机器。

对于计算结果输出,根据下游节点的机器个数,也有多种方式,

  • 如果下游节点有多台机器,例如group by的中间结果,会按照group by的key计算hash,按照hash值选择一个下游机器输出。对于非group by的计算,会随机选择或者round robin。
  • 如果下游节点只有一台机器,会输出到这台机器上。

以下图为例,fragment 2是source类型fragment,有三个split,所以分配了三台机器。因为这一层计算是group by 聚合计算,所以输出时按照group by的key计算hash,选择下游的某台机器输出。

这里写图片描述

调度之前的任务,都在coordinator完成,调度完成后,之后任务发送到worker上执行。

生成物理执行计划

逻辑执行计划fragment发送到机器上后,由结点树形式转写成operator list,根据逻辑代码动态编译生成字节码。动态生成字节码,主要是利用编译原理:

  • 展开循环
  • 根据数据列的类型,直接调用对用的函数,以减少分支跳转语句。

这些手段会更好的利用CPU的流水线。

执行驱动

物理执行计划构造生成的Operator list,交给Driver执行。具体计算哪些数据,由加载的Split决定。

Operator list 以串联形式处理数据,前一个operator的结果作为下一个结果的输入,对于source类型的operator,每一次调用都会获取一份新的数据;对于Aggregate的operator,只有之前所有的operator都finish之后,才能获取输出结果。

这里写图片描述

聚合计算

生成的执行计划中,聚合计算都拆分成了两步,分别是Map、Reduce。

聚合计算的Operator有两类,分别是AggregationOperator和HashAggregationOperator。

AggregationOperator对所有行进行计算,并且把结果更新到一个位置。HashAggregationOperator使用某一列的hash值作为hash表的key,key相同的行才会把结果保存在一起,用于group by类的计算。

聚合计算都是要按照Map-Reduce的形式执行。

聚合计算所提供的函数,都要提供四个接口,分别有两个输入,两个输出:

  1. 接受原始数据的输入
  2. 接受中间结果的输入
  3. 输出中间结果
  4. 输出最终结果。

1+3 构成了Map操作 2+4构成了Reduce操作。

以Avg为例:

  1. Map阶段输入1,2,3,4
  2. Map截断输出10,4 分别代表Sum和Count
  3. Reduce输入10,4
  4. Reduce输出最终平均值2.5

我们改造了Presto系统,使得Presto能够提供缓存功能,就是在MapReduce中间加了一层计算,接受中间结果输入和中间结果输出。

函数

函数分为两类,分别是Scaler函数和Aggregate函数

Scaler函数提供数据的转换处理,不保存状态,一个输入产生一个输出。

Aggregate函数提供数据的聚合处理,利用已有状态+输入,产生新的状态。

上文已经提到,聚合函数提供了两种输入接口和两种输出接口。

深入理解Presto(1) : Presto的架构

简介

Presto是一个facebook开源的分布式SQL查询引擎,适用于交互式分析查询,数据量支持GB到PB字节。presto的架构由关系型数据库的架构演化而来。presto之所以能在各个内存计算型数据库中脱颖而出,在于以下几点:

  1. 清晰的架构,是一个能够独立运行的系统,不依赖于任何其他外部系统。例如调度,presto自身提供了对集群的监控,可以根据监控信息完成调度。
  2. 简单的数据结构,列式存储,逻辑行,大部分数据都可以轻易的转化成presto所需要的这种数据结构。
  3. 丰富的插件接口,完美对接外部存储系统,或者添加自定义的函数。

本文从外到内,依次来介绍presto。

架构

image.png

Presto采用典型的master-slave模型:

  1. coordinator(master)负责meta管理,worker管理,query的解析和调度
  2. worker则负责计算和读写。
  3. discovery server, 通常内嵌于coordinator节点中,也可以单独部署,用于节点心跳。在下文中,默认discovery和coordinator共享一台机器。

在worker的配置中,可以选择配置:

  1. discovery的ip:port。
  2. 一个http地址,内容是service inventory,包含discovery地址。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    	{
    "environment": "production",
    "services": [
    {
    "id": "ffffffff-ffff-ffff-ffff-ffffffffffff",
    "type": "discovery",
    "location": "/ffffffff-ffff-ffff-ffff-ffffffffffff",
    "pool": "general",
    "state": "RUNNING",
    "properties": {
    "http": "http://192.168.1.1:8080"
    }
    }
    ]
    }
  3. 一个本地文件地址,内容同2。

2和3的原理是基于service inventory, worker 会动态监听这个文件,如果有变化,load出最新的配置,指向最新的discovery节点。

在设计上,discovery和coordinator都是单节点。如果有多个coordinator同时存活,worker 会随机的向其中一个汇报进程和task状态,导致脑裂。调度query时有可能会发生死锁。

discovery和coordinator可用性设计。由于service inventory的使用,监控程序可以在发现discovery挂掉后,修改service inventory中的内容,指向备机的discovery。无缝的完成切换。coordiantor的配置必须要在进程启动时指定,同一个集群中无法存活多个coordinator。因此最好的办法是和discovery配置到一台机器。 secondary机器部署备用的discovery和coordinator。在平时,secondary机器是一个只包含一台机器的集群,在primary宕机时,worker的心跳瞬间切换到secondary。

数据模型

presto采取三层表结构:

  1. catalog 对应某一类数据源,例如hive的数据,或mysql的数据
  2. schema 对应mysql中的数据库
  3. table 对应mysql中的表

image.png

presto的存储单元包括:

  1. Page: 多行数据的集合,包含多个列的数据,内部仅提供逻辑行,实际以列式存储。
  2. Block:一列数据,根据不同类型的数据,通常采取不同的编码方式,了解这些编码方式,有助于自己的存储系统对接presto。

不同类型的block:

  1. array类型block,应用于固定宽度的类型,例如int,long,double。block由两部分组成
    • boolean valueIsNull[]表示每一行是否有值。
    • T values[] 每一行的具体值。
  2. 可变宽度的block,应用于string类数据,由三部分信息组成

    • Slice : 所有行的数据拼接起来的字符串。
    • int offsets[] :每一行数据的起始便宜位置。每一行的长度等于下一行的起始便宜减去当前行的起始便宜。
    • boolean valueIsNull[] 表示某一行是否有值。如果有某一行无值,那么这一行的便宜量等于上一行的偏移量。
  3. 固定宽度的string类型的block,所有行的数据拼接成一长串Slice,每一行的长度固定。

  4. 字典block:对于某些列,distinct值较少,适合使用字典保存。主要有两部分组成:
    • 字典,可以是任意一种类型的block(甚至可以嵌套一个字典block),block中的每一行按照顺序排序编号。
    • int ids[] 表示每一行数据对应的value在字典中的编号。在查找时,首先找到某一行的id,然后到字典中获取真实的值。

插件

了解了presto的数据模型,就可以给presto编写插件,来对接自己的存储系统。presto提供了一套connector接口,从自定义存储中读取元数据,以及列存储数据。先看connector的基本概念:

  1. ConnectorMetadata: 管理表的元数据,表的元数据,partition等信息。在处理请求时,需要获取元信息,以便确认读取的数据的位置。Presto会传入filter条件,以便减少读取的数据的范围。元信息可以从磁盘上读取,也可以缓存在内存中。
  2. ConnectorSplit: 一个IO Task处理的数据的集合,是调度的单元。一个split可以对应一个partition,或多个partition。
  3. SplitManager : 根据表的meta,构造split。
  4. SlsPageSource : 根据split的信息以及要读取的列信息,从磁盘上读取0个或多个page,供计算引擎计算。

插件能够帮助开发者添加这些功能:

  1. 对接自己的存储系统。
  2. 添加自定义数据类型。
  3. 添加自定义处理函数。
  4. 自定义权限控制。
  5. 自定义资源控制。
  6. 添加query事件处理逻辑。

Presto提供了一个简单的connector : local file connector ,可用于参考如何实现自己的connector。不过local file connector中使用的遍历数据的单元是cursor,即一行数据,而不是一个page。 hive 的connector中实现了三种类型,parquet connector, orc connector, rc file connector。
image.png

上文从宏观上介绍了presto的一些原理,接下来几篇文章让我们深入presto 内部,了解一些内部的设计,这对性能调优会有比较大的用处,也有助于添加自定义的operator。

String copy on write 引发的线程不安全

一个string对象的读操作是线程安全的么?答案是否定的。读取一个string在某些情况下是线程不安全的。这是为什么呢?原因就是string在优化存储空间时采用的策略cow。

什么是cow

Cow 是copy on write的缩写。String 为了减少内存拷贝,当两个string对象的内容相同时,他们指向同一块内存空间,并通过引用计数来表示有多少对象引用了这块内存。当其中某个string对象的内容发生改变时,string会先重新分配一块空间,把原来的内容拷贝到新空间,原来的空间的引用计数减去1。

什么情况下引发cow

当程序判断string对象要改变时,就会引发cow。一种比较少见的情况就是,程序获得了string对象的非const引用,那么程序无法判断在接下来的操作中是否会改变这个string对象。所以会触发cow操作。一个典型的例子是string::operator[], 当一个非const对象调用[]操作符时,实际上获得的是对象的非const引用,于是引发了cow。

危害
上边的operator[],如果只是为了读取字符串的话,那么我们期望这个操作应该是线程安全的。但实际上这个读操作包含了写操作,假如另一个线程同时在读这个字符串的话,程序就有可能在这里使内存写坏。为了解决这个问题,必须给string对象限定为const.

我在产品中发现了这个问题,为了调查这个问题花费了3周的时间,并在第四周时从理论上解释通了这个原因。