第5章Spark 导学 Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,在近两年内已发展成为大数据处理领域最炙手可热的开源项目。 了解: Spark的发展现状; Spark的应用场景与Spark的医学应用案例。 掌握: Spark的概念; Spark有哪些优点(对比Hadoop); Spark速度比Hadoop快的原因; Spark生态系统的组成; Spark生态系统中的Runtime、Spark SQL、MLlib、GraphX、Spark Streaming的概念。 在大数据领域,Apache Spark(以下简称Spark)通用并行分布式计算框架越来越受人瞩目。Spark适合各种迭代算法和交互式数据分析,能够提升大数据处理的实时性和准确性,能够更快速地进行数据分析。 5.1Spark平台 Spark和Hadoop都属于大数据的框架平台,而Spark是Hadoop的后继产品。由于Hadoop设计上只适合离线数据的计算以及在实时查询和迭代计算上的不足,已经不能满足日益增长的大数据业务需求。因而Spark应运而生,Spark具有可伸缩、在线处理、基于内存计算等特点,并可以直接读写Hadoop上任何格式的数据。 5.1.1Spark简介 Spark是一个开源的通用并行分布式计算框架。2009年由加州大学伯克利分校的AMP实验室开发,是当前大数据领域最活跃的开源项目之一。Spark也称为快数据,与Hadoop的传统计算方式MapReduce相比,效率至少提高100倍。比如通过对比逻辑回归算法在Hadoop和Spark上的运行时间,可以看出Spark 的效率有很大的提升,如图51所示。 图51逻辑回归算法在Hadoop和 Spark上的运行时间对比 Spark编程非常高效、简洁,支持多种语言的API,如Scala、Java、Python等。例如在基于MapReduce开发的WordCount示例程序中,用户需要重写Map类和Reduce类,虽然MapReduce类似八股文的程序编写模式极大地简化了并行程序开发过程,但是程序代码至少几十行。若基于Spark开发同样的WordCount程序,仅需短短的几行代码,就可以对单词个数进行统计。 5.1.2Spark发展 Spark的发展速度非常迅速。2009年,Spark诞生; 2010年,Spark正式开源; 2013年成为Apache基金项目; 2014年成为Apache基金的顶级项目,整个过程不到五年时间。 从 2013年6月到2014年6月,Spark的开发人员从原来的68位增长到255位,参与开发的公司也从17家上升到50家。在这50家公司中,有来自中国的阿里巴巴、百度、网易、腾讯、搜狐等公司。当然,代码库的代码行也从原来的63000行增加到75000行。图52所示为截至2014年Spark的开发人员数量每年的增长曲线。 图52Spark的开发人员数量每个月的增长曲线 Spark广泛应用在国内外各大公司,比如国外的谷歌、亚马逊、雅虎、微软和国内的百度、腾讯、爱奇艺、阿里等公司。近两年,Spark在中国的发展达到了一个前所未有的状态和高度。其中阿里巴巴的搜索和广告业务,最初使用Mahout和MapReduce来解决复杂的机器学习问题,但是在效率和代码维护方面并不理想,现已转向Spark框架。淘宝技术团队使用Spark实现了多次迭代的机器学习算法和一些高计算复杂度的算法,并将其运用在推荐系统上; 同时还利用Spark中的一系列组件解决了基于最大连通图的社区发现、基于三角形计数的关系衡量、基于随机游走的用户属性传播等许多生产问题。此外,腾讯也是最早使用Spark的应用之一。借助Spark快速迭代的优势,腾讯提出了大数据精准推荐,并采用“数据算法系统”这套技术方案支持每天上百亿的请求量。随着各行业数据量的与日俱增,相信Spark会应用到越来越多的生产场景中去。 5.1.3Spark的优点 与Hadoop相比,Spark真正的优势在于速度,除了速度之外,Spark还有很多的优点,如表51所示。 表51Hadoop与Spark的对比 对比项 Hadoop Spark 工作方式 非在线、静态 在线、动态 处理速度 高延迟 比Hadoop快数十倍至上百倍 兼容性 开发语言: JAVA语言 最好在Linux系统下搭建,对Windows的兼容性不好。 开发语言: 以Scala为主的多语言 对Linux和Windows等操作系统的兼容性都非常好 存储方式 磁盘 既可以在内存上存储,也可以在磁盘上存储。 操作类型 只提供Map和Reduce两个操作,表达力欠缺。 提供很多转换和动作,很多基本操作如Join、Group By已经在RDD转换和动作中实现。 数据处理 只适用数据的批处理,实时处理非常差。 除了能够提供交互式实时查询外,还可以进行图处理、流式计算和反复迭代的机器学习等。 逻辑性 处理逻辑隐藏在代码细节中,没有整体逻辑。 代码不包含具体操作的实现细节,逻辑更清晰。 抽象层次 抽象层次低,需要手工编写代码来完成。 Spark的API更强大,抽象层次更高。 可测试性 不容易 容易 5.1.4Spark速度比Hadoop快的原因 1. Hadoop数据抽取运算模型 使用Hadoop处理一些问题诸如迭代式计算,每次对磁盘和网络的开销相当大。尤其每一次迭代计算都将结果写到磁盘以后再读回来,另外计算的中间结果还需要三个备份。Hadoop中的数据传送与共享、串行方式、复制以及磁盘I/O等因素都使得Hadoop集群在低延迟、实时计算方面表现有待改进。Hadoop的数据抽取运算模型如图53所示。 图53Hadoop数据抽取运算模型 从图53中可以看出,Hadoop中数据的抽取运算是基于磁盘的,中间结果也存储在磁盘上。所以,MapReduce运算伴随着大量的磁盘的I/O操作,运算速度严重受到了限制。 2. Spark数据抽取运算模型 Spark使用内存(RAM)代替了传统HDFS存储中间结果,Spark的数据抽取运算模型如图54所示。 图54Spark数据抽取运算模型 从图54中可以看出,Spark这种内存型计算框架比较适合各种迭代算法和交互式数据分析。可每次将操作过程中的中间结果存入内存中,下次操作直接从内存中读取,省去了大量的磁盘I/O操作,效率也随之大幅提升。 5.2Spark生态系统 Spark整个生态系统分为三层,如图55所示。 图55Spark生态系统组成 从底向上分别为: (1) 底层的Cluster Manager和Data Manager: Cluster Manager负责集群的资源管理; Data Manager负责集群的数据管理。 (2) 中间层的Spark Runtime,即Spark内核。它包括Spark的最基本、最核心的功能和基本分布式算子。 (3) 最上层为四个专门用于处理特定场景的Spark高层模块: Spark SQL、MLlib、GraphX和Spark Streaming,这四个模块基于Spark RDD进行了专门的封装和定制,可以无缝结合,互相配合。 5.2.1底层的群集管理器和数据管理器 Cluster Manager负责集群的资源管理; Data Manager负责集群的数据管理。 1. 集群的资源管理可以选择YARN、Mesos等 Mesos是Apache下的开源分布式资源管理框架,它被称为分布式系统的内核。Mesos根据资源利用率和资源占用情况,在整个数据中心内进行任务的调度,提供类似于YARN的功能。Mesos内核运行在每个机器上,可以通过数据中心和云环境向应用程序(Hadoop、Spark等)提供资源管理和资源负载的API接口。 2. 集群的数据管理则可以选择HDFS、AWS等 Spark支持两种分布式存储系统: HDFS和AWS。亚马逊云计算服务AWS(Amazon Web Services,AWS)提供全球计算、存储、数据库、分析、应用程序和部署服务; AWS提供的云服务中支持使用Spark集群进行大数据分析。Spark对文件系统的读取和写入功能是Spark自己提供的,借助Mesos分布式实现。 5.2.2中间层的Spark Runtime Spark Runtime包含Spark的基本功能,这些功能主要包括任务调度、内存管理、故障恢复以及和存储系统的交互等。Spark的一切操作都是基于RDD实现的,RDD是Spark中最核心的模块和类,也是Spark设计的精华所在。 1. RDD的概念 RDD(Resilient Distributed Datasets,RDD)即弹性分布式数据集,可以简单地把RDD理解成一个提供了许多操作接口的数据集合,和一般数据集不同的是,其实际数据分布存储在磁盘和内存中。 对开发者而言,RDD可以看作是Spark中的一个对象,它本身运行于内存中。如读文件、写文件、数据之间的依赖、KeyValue类型的Map数据都可以看作RDD。RDD是一个大的集合,将所有数据都加载到内存中,方便进行多次重用。 2. RDD的操作类型 RDD提供了丰富的编程接口来操作数据集合,一种是Transformation操作,另一种是Action操作。 (1) Transformation的返回值是一个RDD,如Map、Filter、Union等操作。它可以理解为一个领取任务的过程。如果只提交Transformation是不会提交任务来执行的,任务只有在Action提交时才会被触发。 (2) Action返回的结果把RDD持久化起来,是一个真正触发执行的过程。它将规划以任务(Job)的形式提交给计算引擎,由计算引擎将其转换为多个Task,然后分发到相应的计算结点,开始真正的处理过程。 Spark的计算发生在RDD的Action操作,而对Action之前的所有Transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。 图56有向无环图DAG的生成 Spark内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图(Directed Acyclic Graph,简称DAG)。举个例子,在图56中,从输入中逻辑上生成A和C两个RDD,经过一系列Transformation操作,逻辑上生成了F。注意,这时候计算没有发生,Spark内核只是记录了RDD的生成和依赖关系。当F要进行输出(进行了Action操作)时,Spark会根据RDD的依赖生成DAG,并从起点开始真正的计算。 5.2.3高层的应用模块 1. Spark SQL Spark SQL作为Spark大数据框架的一部分,主要用于结构化数据处理和对Spark数据执行类SQL的查询,并且与Spark生态的其他模块无缝结合。Spark SQL兼容SQL、Hive、JSON、JDBC和ODBC等操作。Spark SQL的前身是Shark,而Shark的前身是Hive。Shark比Hive在性能上要高出一到两个数量级,而Spark SQL比Shark在性能上又要高出一到两个数量级。 2. MLlib MLlib是一个分布式机器学习库,即在Spark平台上对一些常用的机器学习算法进行了分布式实现,随着版本的更新,它也在不断扩充新的算法。MLlib支持多种分布式机器学习算法,如分类、回归、聚类等。MLlib已经实现的算法如表52所示。 表52MLlib已经实现的算法 算法 功能 Classilication/Clustenng/Regressionilree 分类算法、回归算法、决策树、聚类算法 Optimization 核心算法的优化方法实现 Stat 基础统计 Feature 预处理 Evaluation 算法效果衡量 Linalg 基础线性代数运算支持 Recommendation 推荐算法 3. GraphX GraphX是构建于Spark上的图计算模型,GraphX利用Spark框架提供的内存缓存RDD、DAG和基于数据依赖的容错等特性,实现高效健壮的图计算框架。GraphX的出现,使得Spark生态系统在大图处理和计算领域得到了更加的完善和丰富,同时其与Spark生态系统其他组件进行很好的融合,以及强大的图数据处理能力,使其广泛地应用在多种大图处理的场景中。 GraphX实现了很多能够在分布式集群上运行的并行图计算算法,而且还拥有丰富的API接口。因为图的规模大到一定的程度之后,需要将算法并行化,以方便其在分布式集群上进行大规模处理。GraphX优势就是提升了数据处理的吞吐量和规模。 4. Spark Streaming Spark Streaming是Spark系统中用于实时处理流数据的分布式流处理框架,扩展了Spark流式大数据处理能力。流数据的例子有生产环境的Web服务器生成的日志文件,用户向一个Web服务请求包含状态更新的消息。 Spark Streaming将数据流以时间片为单位进行分割形成RDD,能够以相对较小的时间间隔对流数据进行处理。Spark Streaming还能够和其余Spark生态的模块,如Spark SQL、GraphX、MLlib等进行无缝的集成,以便联合完成基于实时流数据处理的复杂任务。 如果要用一句话来概括Spark Streaming的处理思路的话,那就是“将连续的数据持久化、离散化、然后进行批量处理”。 (1) 数据持久化 将从网络上接收到的数据先暂时存储下来,为事件处理出错时的事件重演提供可能。 (2) 数据离散化 数据源源不断的涌进,永远没有尽头。既然不能穷尽,那么就将其按时间分片。比如采用一分钟为时间间隔,那么在连续的一分钟内收集到的数据就集中存储在一起。 (3) 批量处理 将持久化下来的数据分批进行处理,处理机制套用之前的RDD模式。 5.3Spark的实现方法 5.3.1Spark环境配置 Spark环境配置如图57和图58所示。 图57Spark环境配置软件 图58Spark环境变量配置 5.3.2Spark操作实例 测试文件test.txt内容,测试代码如图59所示。 图59测试代码 5.4Spark在医学领域的应用 5.4.1Spark在医学领域的应用场景 Spark能够满足医学信息处理中以交互式查询和迭代计算为代表的统计分析、数据挖掘、图形计算等各种数据处理需求,可用于临床转化医学研究、基于海量原始数据的实时卫生统计和辅助决策、文献挖掘、流行病预警和预测等。 Spark可以解决医学大数据计算中的批处理、交互查询及流式计算等核心问题。Spark的优势不仅体现性能的提升,Spark框架还为批处理(Spark Core)、SQL查询(Spark SQL)、流式计算(Spark Streaming)、机器学习(MLlib)、图计算(GraphX)等提供一个统一的数据处理平台,这相对于使用Hadoop有很大优势。表53列举了Spark在应用方面与其他大数据框架的对比。 表53Spark的应用 应用场景 成熟的框架 Spark 时间对比 复杂的批量数据处理 MapReduce(Hive) Spark Runtime 小时级,分钟级 基于历史数据的交互式查询 MapReduce Spark SQL 分钟级,秒级 基于实时数据流的数据处理 Storm Spark Streaming 秒级,秒级 基于历史数据的数据挖掘 Mahout Spark MLlib 分钟级,秒级 基于增量数据的机器学习 无 Spark Streaming+ MLlib 分钟级 基于图计算的数据处理 无 Spark GraphX 分钟级 5.4.2使用Scala语言开发Spark医学应用程序 本节将从实例出发,介绍如何使用Scala语言(Spark框架的开发语言)开发Spark应用程序并且将其运行在Spark集群环境中。 图510测试数据格式预览 医学应用案例描述: 假设需要统计一个1000万患有糖尿病患者的平均年龄。假设这些年龄信息都存储在一个文件里,并且该文件的格式如下,第一列是ID,第二列是年龄。测试数据格式预览如图510所示。 针对以上应用案例,使用Scala语言开发Spark应用程序,操作步骤如下: (1) 在Windows环境下搭建Spark平台 Spark平台的搭建主要包括四个步骤,分别是JDK的安装和配置、Scala的安装、Intellij IDE的安装和配置、Spark的安装。详细的安装和调试方法可以参照网址: http://blog.csdn.net/ZHAOLEI5911/article/details/53138493。 (2) 启动Intellij IDE(Scala语言编辑器),选择Scala,新建工程,如图511所示。然后在工程目录下创建一个lib文件夹,并且把Spark安装包下的sparkassembly.jar包复制到lib目录下,如图512所示。 图511新建工程 图512Spark 开发jar包 图513添加jar包到classpath中 并且添加该jar包到工程的classpath中,并配置工程使用刚刚安装的Scala2.10.4版本,工程目录结构如图513所示。 (3) 用Scala语言编写一个生成1000万患者年龄数据的程序文件,源程序如图514所示。 图514年龄信息文件生成类源码 (4) 案例分析与编程实现 案例分析: 要计算1000万患者的平均年龄,那么首先需要对源文件对应的 RDD进行处理,也就是将它转化成一个只包含年龄信息的RDD,其次是计算元素个数即为总人数,然后把所有年龄数加起来即为总年龄,最后平均年龄=总年龄/人数。对于第一步需要使用Map算子把源文件对应的RDD映射成一个新的只包含年龄数据的RDD,需要对在Map算子的传入函数中使用Split方法,得到数组后只取第二个元素即为年龄信息; 第二步计算数据元素总数需要对于第一步映射的结果RDD,使用Count算子; 第三步则是使用Reduce算子对只包含年龄信息的RDD的所有元素用加法求和; 最后使用除法计算平均年龄即可。 编程实现: 由于本例输出结果很简单,所以在控制台输出即可,用Scala语言编写AvgAgeCalculator类来实现平均年龄的计算,源码如图515所示。 图515AvgAgeCalculator类源码 (5) 提交到集群执行 要执行本实例的程序,需要将刚刚生成的年龄信息文件上传到HDFS上,假设刚才已经在目标机器上执行生成年龄信息文件的Scala类,并且文件被生成到了/home/fams目录下。需要运行一下HDFS命令把文件复制到HDFS的/user/fams目录下。 将年龄信息文件复制到HDFS目录下的命令为: hdfs dfs -copyFromLocal /home/fams /user/fams。 在控制台执行AvgAgeCalculator类的命令如图516所示。 图516执行AvgAgeCalculator类的命令 执行完AvgAgeCalculator类的命令后,就可以在控制台看到程序的输出结果,如图517所示。 图517程序的输出结果 通过本例题,目的是让大家对如何使用Scala语言编写Spark应用程序进行简单的了解。当然在处理实际问题时,情况可能比本例题复杂很多,但是解决问题的基本思想是一致的。在碰到实际问题的时候,首先要对源数据结构格式等进行分析,然后确定如何去使用Spark提供的算子对数据进行转化,最终根据实际需求选择合适的算法操作数据并计算结果。 本章小结 Spark作为一个开源的大数据处理平台,以其内存计算、可伸缩及高效的容错特性,与分布式文件存储系统、分布式数据库结合使用,配合其丰富的生态系统,解决了数据增长和处理性能需求之间存在的瓶颈问题。可以预见Spark在医学领域中具有广阔的应用前景,将在医学大数据处理分析中得到更广泛和深入的应用。 【关键词注释】 1. 迭代: 是重复反馈过程的活动,其目的通常是为了逼近所需目标或结果。每一次对过程的重复称为一次“迭代”,而每一次迭代得到的结果会作为下一次迭代的初始值。 2. 流数据: 流数据是一组顺序、大量、快速、连续到达的数据序列。一般情况下,数据流可被视为一个随时间延续而无限增长的动态数据集合。应用于网络监控、传感器网络、航空航天、气象测控和金融服务等领域。 3. R语言: 用于统计分析、绘图的语言和操作环境。R语言是一个自由、免费、源代码开放的软件,它是一个用于统计计算和统计制图的优秀工具。 4. 逻辑回归: 是一种广义的线性回归分析模型,常用于数据挖掘、疾病自动诊断、经济预测等领域。例如探讨引发疾病的危险因素,并根据危险因素预测疾病发生的概率等。 5. Python语言: 是一种面向对象、解释型计算机程序设计语言。Python具有丰富和强大的库。它常被昵称为胶水语言,能够把用其他语言制作的各种模块(尤其是C/C++)很轻松地联结在一起。 6. JSON: (JavaScript Object Notation,JSON)是一种轻量级的数据交换格式。JSON采用完全独立于语言的文本格式,但是也使用了类似于C语言家族的习惯(包括C、C++、C#、Java、JavaScript、Perl、Python等)。这些特性使JSON成为理想的数据交换语言。 7. JDBC: (Java Data Base Connectivity,Java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成。 8. ODBC: (Open Database Connectivity,开放数据库连接)是微软公司开放服务结构中有关数据库的一个组成部分,它建立了一组规范,并提供了一组对数据库访问的标准API。这些API利用SQL来完成其大部分任务。ODBC本身也提供了对SQL语言的支持,用户可以直接将SQL语句送给ODBC。 9. 基于最大连通图: 把图的所有结点用最少的边将其连接起来的子图,所以极大连通子图不为1,因此可以说最大连通子图是一个累赘概念,因为任何一个极大连通子图,其实都可以叫作最大连通子图。 10. 持久化: 把数据(如内存中的对象)保存到可永久保存的存储设备中(如磁盘)。持久化的主要应用是将内存中的对象存储在数据库中,或者存储在磁盘文件、XML数据文件中,等等。 11. iter: 迭代器(iterator)有时又称游标(cursor),是程序设计的软件设计模式,可在容器(container,例如链表或阵列)上遍访的接口,设计人员无须关心容器的内容。 12. 算子:是一个函数空间到函数空间上的映射O: X→X。广义地讲,对任何函数进行某一项操作都可以认为是一个算子,甚至包括求幂次、开方都可以认为是一个算子,只是有的算子用了一个符号来代替它所要进行的运算罢了,它和f(x)的f没区别,它甚至和加减乘除的基本运算符号都没有区别,只是它可以对单对象操作罢了(有的符号比如大于、小于号要对多对象操作)。 习题5 一、 填空题 1. Spark大数据框架适合各种算法和交互式数据分析,能够提升大数据处理的实时性和准确性。 2. 也称为快数据,与Hadoop的传统计算方式MapReduce相比,效率至少提高100倍。 3. 语言是Spark框架的开发语言,是一种类似Java的编程语言。 4. Spark是当前流行的大数据处理框架,具有快速、通用、简单等特点。 5. 与Hadoop相比,Spark真正的优势在于。 6. Spark使用代替了传统HDFS存储中间结果。 7. Spark整个生态系统分为三层,底层的负责集群的资源管理。 8. Spark整个生态系统分为三层,底层的负责集群的数据管理。 9. Spark整个生态系统分为三层,中间层的包括Spark的最基本、最核心的功能和基本分布式算子。 10. RDD(Resilient Distributed Datasets,弹性分布式数据集)即。 11. 对开发者而言,可以看作是Spark中的一个对象,它本身运行于内存中。它是一个大的集合,将所有数据都加载到内存中,方便进行多次重用。 12. RDD提供了丰富的编程接口来操作数据集合,一种是操作,另一种是Action操作。 13. RDD的操作返回的结果把RDD持久化起来,是一个真正触发执行的过程。 14. Spark内核会在需要计算发生的时刻绘制一张关于计算路径的,简称DAG。 15. 作为Spark大数据框架的一部分,主要用于结构化数据处理和对Spark数据执行类SQL的查询。 16. 是一个分布式机器学习库,即在Spark平台上对一些常用的机器学习算法进行了分布式实现。 17. 是构建于Spark上的图计算模型,它利用Spark框架提供的内存缓存RDD、DAG和基于数据依赖的容错等特性,实现高效健壮的图计算框架。 18. 是Spark系统中用于处理流数据的分布式流处理框架,扩展了Spark流式大数据处理能力。 19. Spark Streaming将数据流以时间片为单位进行分割形成,能够以相对较小的时间间隔对流数据进行处理。 20. Spark Streaming还能够和其余Spark生态的模块进行无缝的集成,以便联合完成基于处理的复杂任务。 二、 简答题 1. 简述什么是Spark。 2. 与Hadoop进行比较,Spark在工作方式、处理速度、存储方式和兼容性等方面有哪些优点。 3. 从数据抽取运算模型进行分解,分析Spark速度比Hadoop快的原因。 4. 简述Spark整个生态系统分为哪三层。 5. 简述什么是RDD。 6. 简述什么是RDD的Transformation操作和Action操作。 7. 通过下面的图,简述什么是DAG,DAG是如何生成的。 8. 简述什么是Spark SQL 9. 简述什么是GraphX。 10. 简述什么是Spark Streaming的数据持久化、离散化和批量处理。