第3章 Spark内核架构 Spark是一个用于大规模数据处理分析的引擎,它为分布式数据处理提供了方便的API,使用户可以更加便捷地用统一的API操作分布式的数据集。本章将介绍Spark的内核架构,剖析Spark的重要组件,为后续性能调优章节提供理论支持。本章的主要内容有: Spark编程模型。 Spark组件简介。 Spark作业执行原理。 Spark内存管理。 Spark存储原理。 视频讲解 视频讲解 3.1Spark编程模型 Spark能够做到分布式的迭代计算、容错恢复等,离不开其最基本、最重要的抽象——RDD。本节将对RDD的概念及RDD的重要特性进行讲解,并分析Spark内部对RDD的编码实现。 3.1.1RDD概述 正如MapReduce的编程模型来源于Google的论文一样,RDD最初的概念也是来源于一篇论文——伯克利实验室的 Resilient Distributed Datasets: A FaultTolerant Abstraction for InMemory Cluster Computing。 这篇论文奠定了RDD实现功能的基本思想,感兴趣的读者可以查看这篇论文 ,网址是http://www.eecs.berkeley.edu/Pubs/TechRpts/2011/EECS201182.pdf。 从论文的名称即可看出,RDD实际为Resilient Distributed Datasets的简称,意为弹性分布式数据集,是一种基于集群内存计算的容错的抽象。因此,RDD定义的是一种编程抽象,它规定了在这样的一个基于内存计算的容错的场景下,RDD应该具备的功能。RDD并不是Spark独有的编程模型,Spark只是对RDD编程模型的一种实现,如果 用户有一定的编程功底,完全可以根据这个思想开发出另一套类似于Spark的计算引擎。 3.1.2RDD的基本属性 对于计算系统而言,RDD是一个数据集操作的接口,集群系统可以对RDD的数据进行存储、计算等操作。对于用户而言,RDD是一组数据集,用户可以通过RDD的API对其进行一些操作,如进行转换、过滤等操作。基于以上RDD 的基本功能,RDD被抽象出几个最基本的属性,分别为分区、计算函数、依赖、分区器、 首选运行位置。这几个基本属性对于RDD而言非常重要,Spark的程序实现基本上都是围绕这些属性进行操作的,所以理解RDD的原理是理解Spark的关键。 1. 分区 RDD的中文含义为弹性分布式数据集,其中分区的概念实现了分布式所需的功能。一个RDD从某种角度上讲就是一组数据集, 图3.1RDD 的逻辑分区 在一些大规模计算中,一个数据集中的数据量会达到非常大的级别,而这些数据难以在一台机器中进行存储、计算。RDD的解决思路就是将数据进行分区,一个大的数据集被分为很多小的分区,每个分区中包含有RDD中的一部分数据,大量的分区可以在不同的节点中同时运行,通过对每个分区的数据进行计算,然后对计算结果进行汇总,从而实现对整个数据集的计算。这些分区的数量按照数字从0开始依次进行编号,RDD的分区如图3.1所示。 在真正计算时,RDD的不同分区是分散到不同的计算节点进行计算的,每个计算节点计算RDD的一个或多个分区,从而形成数据的分布式计算。这里需要注意的是,RDD的一个分区数据是不能分散到多个节点上的,一个分区的数据只能在某个具体的节点上。RDD在节点上的分布如图3.2所示。 图3.2RDD在节点上的分布 RDD的计算是以分区为单位进行的,而且同一分区的所有数据都进行相同的计算。对于一个分区数据而言,必须执行相同的操作: 要么都执行,要么都不执行。所以RDD的计算是一种粗粒度的计算操作,必须对一批数据全部执行相同的操作。分区的数量决定了同时执行的任务的数量,因为可以为每个分区启动一个计算任务用于单独计算这个分区的数据。理论上分区数越大,能够并行执行的任务数量就越多。但实际运行时,还会受到物理资源如CPU等的限制。 2. 计算函数 RDD的数据被分区了,但是每个分区的数据是如何得来的呢?一个RDD的数据 来源只有两种: 一是从数据源或集合中进行加载得到RDD的数据; 二是通过其他的RDD进行一定的转换得到 的数据。无论哪一种方式,RDD的数据其实都是通过RDD的计算函数得到的。 1) 创建新的RDD RDD的计算函数定义了如何根据一个分区计算出该分区的数据,并且这个函数的返回值为迭代器类型。这样通过给定某一个具体的分区,就可以通过计算函数计算出该RDD这个分区的数据。通过对RDD的所有分区进行计算,即可得到RDD的所有分区的数据。如Spark在加载HDFS中的数据时,每个分区的数据通过计算函数加载对应block块的数据,从而实现了数据的分布式加载的过程,如图3.3所示。 图3.3RDD 从HDFS加载示意图 RDD通过定义分区和计算函数可以实现非常丰富的数据加载的类型,如上文中提到的从HDFS中加载数据,每个分区加载一个block块的数据,还可以实现从集合中进行创建,实现每个分区加载一个集合中的部分数据,如SparkContext中实现的parallelize的并行集合的方法。甚至用户可以自定义分区函数实现特定加载数据的方式,如将历史数据按照时间分区进行加载等,如图3.4所示。 图3.4RDD数据加载方式 2) RDD的转换 一个RDD中的数据还可以通过其他RDD中的数据转换得到,通过一定的转换操作将一个RDD转换为一个新的RDD,其中新RDD一般称为子RDD,依赖的RDD成为父RDD。 如图3.4所示,将Array转换的RDD每个元素都加1,即可形成一个新的RDD,其转换过程如图3.5所示。 图3.5RDD的转换操作 在图3.5中,rdd2通过该RDD的计算函数在rdd1的数据基础之上计算出了该RDD的每个分区的数据。当需要计算rdd2的数据时,rdd2就会调用其计算函数,使用rdd1的数据进行计算,如果rdd1的数据不存在,则首先调用rdd1的计算函数计算rdd1每个分区的数据。 这里需要注意的是,rdd2的计算函数并不是图3.5中的map()函数,计算函数是rdd2的一个属性,rdd1通过map()函数生成了rdd2。在该过程中,确定了rdd2的计算函数应该如何执行。rdd2的计算函数就是获取rdd1中的对应分区的数据,对该分区的数据中执行map()函数传入的匿名函数(_+1)进行转换,从而得到rdd2对应分区的数据。rdd1的转换操作是API级别的,可以控制整个 RDD的转换,但真正数据的转换需要依靠每个分区的数据进行, 这个过程就是由RDD的计算函数实现的,因此RDD的计算函数一定是和某个分区进行关联的,给定 分区才能进行计算,具体计算过程取决于父RDD的转换操作。 从RDD的角度来看,map()函数是rdd1进行的转换,是API级别的转换; 而计算函数是rdd2的属性,是rdd2执行的操作,是真正的数据转换过程,只不过rdd2执行计算函数时需要rdd1对应分区的数据。 在图3.5中,rdd1也会有自己的计算函数,rdd1的计算函数用于计算rdd1中的每个分区的数据。 当形成多个RDD的调用,如图3.6所示,使用最后一个rdd4的计算结果时,会首先调用rdd4的计算函数,计算rdd4中每个分区的数据。但是rdd4d分区的数据依赖于rdd3的数据,此时会继续调用rdd3的计算函数计算rdd3 每个分区的数据。在这个过程中,各个RDD依次调用其依赖的RDD的计算函数,计算依赖的RDD分区的数据,并根据依赖的RDD分区的数据通过自身的计算函数计算出本RDD每个分区中的数据,从而实现RDD之间的流水线式的调用。在这多个RDD转换的过程中,因为它们之间的分区是一一对应的,也就是每个RDD只依赖父RDD的一个固定的分区,所以计算一个RDD 分区的数据时,不必知道父RDD所有分区的数据,只需要知道依赖的这个分区的数据即可。每个分区中的数据可以通过一个流水线的任务(task)转换完成,各个任务之间相互独立,互不影响。而且通过流水线的调用避免了中间RDD结果的存储。 在rdd1转换为rdd4的过程中,每个分区的数据由一个任务(task)即可完成。在一个具体的任务中,负责依次执行每个RDD的计算函数,从而以流水线的形式完成该数据的计算,避免了中间结果的存储。 图3.6RDD之间的流水线转换 RDD之间的转换不仅仅限于一个RDD转换为另一个RDD,还可以将多个RDD合并成一个RDD,新RDD的分区数 为合并的RDD的分区数之和,这个合并的过程也是由新RDD的计算函数完成的。新RDD的分区 需要与合并的分区 通过计算函数实现关联,由计算函数计算每个分区中的数据,实现新RDD的分区和父RDD的分区数据的对应。其过程如图3.7所示。在这个计算过程中,每个分区也可以使用一个计算任务完成流水线的计算。 图3.7RDD合并 在以上RDD的转换中,父RDD的每一个分区总会对应子RDD中的一个分区。但这 并不是必然的,同样取决于子RDD的计算函数是如何工作的。图3.8是实现WordCount的经典过程,在rdd1中,每个分区中保存了文本中出现的每个单词,在rdd2中,将每个单词转换为(word,1)的形式,但最终希望得到每个单词出现的总次数,形成rdd3的结果。在这个过程中,rdd2到rdd3之间的转换便不再是上文中提到的父RDD的一个分区只被子RDD的一个分区使用,而是父RDD的每个分区的数据可能被子RDD的任何一个分区使用。在计算rdd3每个分区的数据时,需要知道rdd2中所有分区的数据,当rdd3计算一个分区的数据时,需要拉取rdd2上的所有分区对应的数据,完成这个分区数据的计算,这个过程称为Shuffle。由于计算rdd3的其中一个分区的数据,必须要知道rdd2中 所有分区的数据,所以在rdd2转换为rdd3的过程中,就不能再像流水线一样进行计算,必须先计算rdd2,将rdd2所有分区的数据都计算完成以后,才能计算rdd3的数据。 图3.8Shuffle过程 在计算RDD的过程中,如果出现Shuffle,则其过程有如下特点。 必须首先计算出依赖的RDD的所有分区的数据,然后后续RDD才能够继续进行计算。 Shuffle的过程必然分为两个阶段: 第一个阶段为计算依赖RDD的数据的阶段(图3.8中的rdd2),一般称为 Map阶段; 第二个阶段为计算汇聚结果的阶段(图3.8中的rdd3),一般 称为Reduce阶段。 两个阶段必须分到两组任务中进行计算,不能像流水线一样使用一组任务 同时计算两个阶段的全部数据。因为后一个阶段必须在上一个阶段的数据全部计算完成以后才能够开始计算,所以必须拆分成两组不同的任务按照先后顺序执行。 涉及Shuffle操作的RDD的计算函数是比较复杂的,它必须向依赖的RDD的所有分区拉取需要的数据,完成某一个分区数据的计算。 无论是从数据源中创建RDD,还是通过其他RDD进行转换,甚至通过Shuffle得到新的RDD,这些操作都离不开RDD的两个基本属性: 分区和计算函数。只有知道了RDD是如何被分区的、这个RDD一共有多少个分区,才能够通过RDD的计算函数计算每一分区的数据。 RDD的计算函数返回值都是迭代器类型,该迭代器中能够返回RDD某分区的所有的数据。RDD的计算函数通过迭代器的形式,避免了分区的数据同时加载至内存中,从而避免了大量内存被占用。 3. 依赖 在RDD进行转换的过程中,子RDD都是通过父RDD转换而来的。但在具体的实现过程中,所有RDD的数据都是通过它自身的计算函数而得到的。所以, 子RDD在计算的过程中需要得到其父RDD,根据父RDD的数据计算出子RDD的每个分区的数据。子RDD在计算时,就是通过依赖的属性,找到其依赖的父RDD的。 在RDD进行计算时,有些子RDD的一个分区只依赖父RDD的一个分区,即每个父RDD的分区最多被子RDD的一个分区使用,这种依赖关系称为窄依赖。在窄依赖中,多个RDD的每一组分区都能够被一个任务以流水线的形式执行。 RDD的窄依赖如图3.9所示。 图3.9RDD的窄依赖 在RDD进行计算时,如果一个分区的数据 依赖了父RDD的多个分区的数据,即多个子RDD的分区数据依赖父RDD的同一个分区的数据, 则这种依赖方式称为宽依赖。在发生宽依赖的位置,必定要发生Shuffle操作,将这个过程分为两个阶段进行操作。第一个阶段计算父RDD的所有分区的数据,并将结果进行保存。在这个过程中每个分区保存的数据已经做出了分类,区分哪一部分的数据为第二阶段的哪个分区使用。第一阶段完成之后,才能进行第二阶段。第二个阶段在计算每一个分区数据时,通过拉取父RDD的对应分区的数据完成数据的聚合。RDD的宽依赖如图3.10所示。 图3.10RDD的宽依赖 RDD根据其依赖关系和计算函数便可以根据父RDD的数据计算出每个分区的数据。RDD这样的转换和依赖关系 称为血统(lineage),即每一个RDD只要根据其lineage,就能够把RDD的数据计算出来。甚至根据 lineage只计算其中某一个分区的数据,而不用计算RDD中所有分区的数据。 4. 分区器 并不是所有的RDD都有分区器(partitioner), 一般只有(key,value)形式的RDD才有分区器。分区器在Shuffle的Map阶段使用,当RDD的计算发生Shuffle时,Map阶段虽然将计算结果进行了保存,供Reduce阶段的任务来拉取数据,但是Map阶段的每个分区的结果可能会被Reduce阶段的多个分区使用。如何把Map阶段的结果进行分组,区分出 结果是给Reduce阶段的RDD哪个分区呢?这就是分区器的作用。 分区器根据每条记录的key,判断这个key属于Reduce哪个分区的数据,同时分区器中也计算了按照key进行分区 将会产生分区的总数量。这个数量决定了在Map阶段每个任务结果分组的大小,也决定了下游的Reduce任务的分区数量的大小。在一个分区数为2的分区器中,分区器的工作原理如图3.11所示。 图3.11分区器的工作原理 5. 首选运行位置 每个RDD对于每个分区来说有一组首选运行位置,用于标识RDD的这个分区数据最好能够在哪台主机上运行。通过RDD的首选运行位置,可以让RDD的某个分区的计算任务直接在指定的主机上运行,从而实现了移动计算而不是移动数据的目的,减小了网络传输的开销,如Spark中HadoopRDD能够实现加载数据的任务在相应的数据节点上执行。 3.1.3RDD的缓存 RDD是进行迭代式计算的,默认并不会保存中间结果的数据,在计算完成后,中间迭代的结果数据都将会丢失。如果一个RDD在计算完成后,不是通过流水线的方式被一个RDD调用,而是被多个RDD分别调用, 则在计算过程中就需要对RDD进行保存,避免RDD的第二次执行相同的计算。当一个RDD被缓存后,后边调用的时候需要RDD的数据时,直接从缓存中读取,而不是对RDD再次进行计算。尤其是一个RDD经过了特别复杂的计算过程、经过多次Shuffle生成的数据,如果多次使用其结果,对其进行缓存,可以极大地提高程序的执行效率。其原理如图3.12所示。 图3.12RDD缓存计算原理 因为RDD的数据是分布式的,不同的分区散落在不同的节点上,所以RDD的缓存也是分布式的。当对一个RDD进行缓存时,可以直接将每个分区的数据缓存在当前分区的计算节点中,每个节点缓存RDD的一部分,完成整个RDD的缓存。RDD在节点中的缓存如图3.13所示。 图3.13RDD在节点中的缓存 根据程序实现的不同,缓存的数据可以选择在内存、磁盘或其他的外部存储器中。也可以实现多副本机制,将本节点的数据复制到其他的节点进行保存,实现数据的冗余。 3.1.4RDD容错机制 RDD的容错是通过其lineage机制实现的。因为一个RDD的数据都可以通过其父RDD的转换而来。如果在运行的过程中,某一分区的缓存数据丢失,则重新计算该分区的数据,当此RDD的依赖是窄依赖时,只需要计算依赖的父RDD的一个分区的数据即可,避免了一个节点出错则所有数据都需重新计算的缺点。但是如果丢失数据的RDD的依赖是宽依赖,那么这个分区的数据 可能依赖父RDD的所有分区的数据,在这种情况下就必须重新计算父RDD的所有分区的数据,从而完成数据恢复。RDD容错机制如图3.14所示。 图3.14RDD的容错