第3章 Spark应用开发基础 3.1Spark的Python编程环境设置 Apache Spark是Scala语言实现的一个计算框架。为了支持Python语言使用Spark,Apache Spark社区开发了一个工具PySpark。利用PySpark中的Py4j库,可以通过Python语言操作RDDs。 PySpark提供了PySpark Shell,它是一个结合了Python API和Spark Core的工具,同时能够初始化Spark环境。目前,由于Python具有丰富的扩展库,大量的数据科学家和数据分析从业人员都在使用Python。因此,PySpark将Spark支持Python是对两者的一次共同促进。 Spark可以独立安装使用,也可以和 Hadoop 一起安装使用。在安装 Spark 之前,首先确保安装了 Java 8 或者更高的版本,并安装配置好Scala。 1. Spark安装 访问Spark官网http://spark.apache.org/,并选择最新版本的 Spark 直接下载。截止到2020年9月11日,Spark的最新版本是3.0.1。下面以版本2.4.5为例,描述Windows 10下的环境配置过程。 (1) 将spark2.4.5binhadoop2.7解压缩到文件夹D:\ProgramFiles中。 (2) 配置环境变量,见表31。 表31大数据平台的环境变量设置 变量名变量值 JAVA_HOMED:\ProgramFiles\Java\jdk1.8.0_202 HADOOP_HOMED:\ProgramFiles\hadoop3.1.3 HIVE_HOMED:\ProgramFiles\hive3.1.2 SCALA_HOMED:\ProgramFiles\scala2.12.11 SPARK_HOMED:\ProgramFiles\spark2.4.5binhadoop2.7 PYSPARK_DRIVER_PYTHONipython PYSPARK_DRIVER_PYTHON_OPTSnotebook (3) 配置PATH路径,例如: PATH=%JAVA_HOME%\bin;%JAVA_HOME%\jre\bin;%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin;%HIVE_HOME%\bin;%SCALA_HOME%\bin;%SPARK_HOME%\bin 配置完成后,在 shell 中输入 sparkshell 或者 pyspark 就可以进入 Spark 的交互式编程环境中,前者是进入 Scala 交互式环境,后者是进入 Python 交互式环境,如图31所示。 图31Spark的安装测试 2. 配置Python编程环境 介绍两种编程环境: Jupyter和Visual Studio Code。前者便于进行交互式编程,后者便于集成式开发。 1) PySpark in Jupyter Notebook 方法一: 配置 PySpark 启动器的2个环境变量。 PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook 方法二: 使用 findSpark 包,在代码中提供 Spark 上下文环境。该方法具有通用性,值得推荐。首先安装 findspark。 pip install findspark 然后打开一个 Jupyter notebook。在进行 Spark 编程时,先导入 findspark 包,示例如图32所示。 图32基于findspark包的Spark计算pi值 2) PySpark in VScode 在VScode上使用Spark,不需要使用findspark包,可以直接进行编程。 from pyspark import SparkContext, SparkConf conf =SparkConf().setMaster("local[]").setAppName("test") sc =SparkContext(conf=conf) logFile = "d:/ProgramFiles/spark-2.4.5-bin-hadoop2.7/README.md" logData = sc.textFile(logFile, 2).cache() numAs = logData.filter(lambda line: 'a'in line).count() numBs = logData.filter(lambda line: 'b' in line).count() print("Lines with a: {0}, Lines with b:{1}".format(numAs, numBs)) 运行结果如图33所示。 图33在VSCode环境中运行Spark程序示例 3. SparkSession介绍 SparkSession是Spark2.0引入的新概念。SparkSession为用户提供了统一的切入点,来让用户学习Spark的各项功能。 在Spark的早期版本中,SparkContext是Spark的主要切入点,由于RDD是主要的API,通过SparkContext来创建和操作RDD。对于每个其他的API,就需要使用不同的Context。例如,对于Streming,使用StreamingContext; 对于SQL,使用SQLContext; 对于Hive,使用HiveContext。但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为它们建立接入点。所以在Spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。 SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了SparkContext,所以计算实际上是由SparkContext完成的。 在PySpark中,SparkContext使用Py4J来启动一个JVM并创建一个JavaSparkContext。默认情况下,PySpark已经创建了一个名为sc的SparkContext,并且在一个JVM进程中可以创建多个SparkContext,但是只能有一个active级别的,因此,如果再创建一个新的SparkContext是不能正常使用的。 4. Spark三种部署方式 Spark支持以下三种不同类型的部署方式。 (1) Standalone(类似于MapReduce1.0,slot为资源分配单位)。 (2) Spark on Mesos(和Spark有“血缘”关系,能更好支持Mesos)。 (3) Spark on YARN。 3.2Spark的工作机制 1. Spark的基本架构 Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor),如图34所示。资源管理器可以自带或Mesos或YARN。 与Hadoop MapReduce计算框架相比,Spark所采用的Executor有两个优点。 (1) 利用多线程来执行具体的任务,减少任务的启动开销; (2) Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,有效减少IO开销。 图34Spark的运行架构图 一个Application由一个Driver和若干个Job构成,一个Job由多个Stage构成,一个Stage由多个没有Shuffle关系的Task组成,如图35所示。 图35Application的组成 当执行一个Application时,Driver会向集群管理器申请资源,启动Executor,并向Executor发送应用程序代码和文件。然后在Executor上执行Task。运行结束后,执行结果则会返回给Driver,或者写到HDFS、其他数据库中。 2. Spark的执行过程 Spark的基本运行流程如图36所示。 图36Spark运行基本流程 (1) 首先为应用构建起基本的运行环境,即由Driver创建一个SparkContext,进行资源的申请、任务的分配和监控。 (2) 资源管理器为Executor分配资源,并启动Executor进程。 (3) SparkContext根据RDD的依赖关系构建DAG图; DAG图提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理; Executor向SparkContext申请Task; Task Scheduler将Task发放给Executor运行,并提供应用程序代码。 (4) Task在Executor上运行,把执行结果反馈给TaskScheduler,然后反馈给DAGScheduler,运行完毕后写入数据并释放所有资源。 总体而言,Spark运行架构具有以下3个特点。 (1) 每个Application都有自己专属的Executor进程,并且该进程在Application运行期间一直驻留。Executor进程以多线程的方式运行Task。 (2) Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可。 (3) Task采用了数据本地性和推测执行等优化机制。 3.3弹性分布式数据集RDD基础 许多迭代式算法(如机器学习、图算法等)和交互式数据挖掘工具的共同之处是,不同计算阶段之间会重用中间结果。目前的MapReduce框架都是把中间结果写入到HDFS中,带来了大量的数据复制、磁盘IO和序列化开销。 RDD就是为了解决这种问题而出现的,它提供了一个抽象的数据架构,从而不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换处理,不同RDD之间的转换操作形成依赖关系,可以实现管道化,避免中间数据存储。 Spark的核心是RDD(resilient distributed dataset),即弹性分布式数据集,是由AMPLab实验室提出的概念,属于一种分布式的内存系统数据集应用。 Spark的主要优势来自RDD本身的特性,RDD能与其他系统兼容,可以导入外部存储系统的数据集,例如HDFS、HBase或其他Hadoop数据源。 1. RDD的概念 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。 RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和group by)而创建得到新的RDD。 RDD是Spark的基石,也是Spark的灵魂。RDD是弹性分布式数据集,是只读的分区记录集合。每个RDD有5个主要的属性。 (1) 一组分片(partition): 数据集的最基本组成单位。 (2) 一个计算每个分片的函数: 对于给定的数据集,需要做哪些计算。 (3) 依赖(Dependencies): RDD的依赖关系,描述了RDD之间的lineage。 (4) preferredLocations(可选): 对于data partition的位置偏好。 (5) partitioner(可选): 对于计算出来的数据结果如何分发。 2. RDD的两种操作类型 RDD提供了一组丰富的操作以支持常见的数据运算,有转换和动作两种操作方式。 (1) 转换(Transformations): 返回RDD,基于现有的数据集创建一个新的数据集,如map、filter、groupBy、join等。 (2) 动作(Actions) : 在数据集上进行运算,返回计算值,而不是一个RDD,如count、collect、save等。RDD通过“转换”运算得到新的RDD,原RDD不受影响。但Spark会延迟这个转换的发生时间点,不会马上执行,而是等到执行了Action之后才会基于所有的RDD关系来执行转换。 RDD提供的转换接口都非常简单,都是类似map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改(不适合网页爬虫)。 1) RDD转换操作 下面以RDD包含{1, 2, 3, 3}为例,说明基本的转换结果。基本的RDD转换函数见表32。 表32基本的RDD转换函数 函数名功能例子结果 map()通过自定义函数进行映射。对每个元素应用函数rdd.map(x=>x+1){2,3,4,4} flatMap()先映射(map),再把元素合并为一个集合。常用来抽取单词rdd.flatMap(x=>x.to(3)){1,2,3,2,3,3,3} filter()对元素过滤,保留符合条件的元素rdd.filter(x=>x!=1){2,3,3} distinct()去重rdd.distinct(){1,2,3} sample (withReplacement, fraction,[seed])根据给定的随机种子seed,随机抽样出比例为fraction的数据list = numpy.arange(1,100,2) listRDD = sc.parallelize(list) sampleRDD = listRDD. sample(0,0.2).collect()[25, 33, 41, 53, 63, 65, 69, 77, 87, 95] groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集。Key相同的值被分为一组kvd.groupByKey().map(lambda x:(x[0],list(x[1]))).collect()[(1, [2]), (3, [4, 6]), (5, [6])] reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合kvd.reduceByKey(lambda x,y:x+y).collect()[(1, 2), (3, 10), (5, 6)] sortByKey通过Key值对KV对数据集排序kvd.sortByKey(ascending=False).collect()[(5, 6), (3, 4), (3, 6), (1, 2)] join对两个RDD进行cogroup、笛卡尔积、展平操作,(K,V)和(K,W)转换为(K,(V,W))x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3)]) sorted(x.join(y).collect())[('a', (1, 2)), ('a', (1, 3))] 其他转换操作还包括mapPartitions、mapPartitionsWithIndex、intersection、aggregateByKey、cartesian、pipe、coalesce、repartition、repartitionAndSortWithinPartitions等。 2) 比较map()和flatMap() flatMap()对每个输入元素输出多个输出元素。flat是压扁的意思,即将RDD中元素压扁后返回一个新的RDD。其工作原理如图37所示。 图37map()和flatMap()函数的比较 3) RDD的动作 主要的动作见表33。 表33RDD的动作操作示例 函数名功能例子结果 collect()相当于toArray,将分布式的RDD返回为一个单机的Array数组rdd.collect(){1,2,3,3} count()返回RDD中元素的个数rdd.count()4 countByValue()返回一个map,表示唯一元素出现的个数rdd.countByValue(){(1,1),(2,1),(3,2)} take(num)返回数据集中前num个元素形成的数组rdd.take(2){1,2} top(num)返回前几个元素rdd.top(2){3,3} takeOrdered (num)(ordering)返回基于提供的排序算法的前几个元素rdd.takeOrdered(2)(myOrdering){3,3} takeSample(withReplacement,num,[seed])取样例rdd.takeSample(false,1)不确定 reduce(func)合并RDD中元素rdd.reduce((x,y)=>x+y)9 fold(zero)(func)与reduce()相似提供zero valuerdd.fold(0)((x,y)=>x+y)9 aggregate (zeroValue)(seqOp, combOp)与fold()相似,返回不同类型rdd.aggregate((0,0))((x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2))(9,4) foreach(func)对数据集中每个元素使用func()函数,无返回rdd.foreach(func)什么也没有 first返回数据集中第一个元素,与take(1)类似rdd.first()1 countByKey只用于KV类型RDD,返回每个Key的个数kvd.countByKey()defaultdict(int, {3: 2, 5: 1, 1: 1}) saveAsTextFile保存数据到文本文件rdd.saveAsTextFile(“file:///db/spark”)将文件保存到本地文件系统; saveAsTextFile("hdfs://db/spark/") //保存到HDFS文本文件 saveAsSequenceFile保存数据为序列化文件用法同saveAsTextFile序列化文件 saveAsObjectFile保存数据为对象文件用法同saveAsTextFile对象文件 3. 集合运算 RDDs支持数学集合的计算,如并集、交集计算。注意: 进行计算的RDDs应该是相同类型。 下面以两个RDD的Transformations为例: 一个RDD包含{1, 2, 3},另一个RDD包含{3, 4, 5},见表34。 表34集合运算示例 函数名功能例子结果 union()并集rdd.union(other) {1, 2, 3, 3,4, 5} intersection()交集rdd.intersection(other){3} subtract()取存在第一个RDD、而不存在第二个RDD的元素(使用场景,机器学习中,移除训练集)rdd.subtract(other){1, 2} cartesian()笛卡尔积rdd.cartesian(other){(1, 3), (1,4), …(3,5)} cartesian()操作非常耗时,其技术原理如图38所示。 图38RDD的catesian运算 4. RDD典型的执行过程 表面上RDD的功能很受限、不够强大,实际上RDD已经被实践证明可以高效地表达许多框架的编程模型(如MapReduce、SQL、Pregel)。Spark用Scala语言实现了RDD的API,程序员可以通过调用API实现对RDD的各种操作。其基本执行过程如图39所示。 图39RDD的执行过程 (1) RDD读入外部数据源进行创建。 (2) RDD经过一系列的转换(Transformation)操作,每一次都会产生不同的RDD,供给下一个转换操作使用。 (3) 最后一个RDD经过“动作”操作进行转换,并输出到外部数据源。 这一系列处理称为一个lineage(血缘关系),即DAG拓扑排序的结果。这样操作的优点是惰性调用、管道化、避免同步等待、不需要保存中间结果、每次操作变得简单。 5. RDD之间的依赖关系 RDD只能基于稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。 能从其他RDD通过确定操作创建新的RDD的原因是RDD含有从其他RDD衍生(即计算)出本RDD的相关信息(即lineage)。Dependency代表了RDD之间的依赖关系,即血缘(Lineage),分为窄依赖和宽依赖。 (1) 窄依赖: 一个父RDD最多被一个子RDD用在一个集群节点上管道式执行。例如map、filter、union等。 (2) 宽依赖: 指子RDD的分区依赖于父RDD的所有分区,这是因为shuffle类操作要求所有父分区可用。例如groupByKey、reduceByKey、 sort、partitionBy等。 注意: 一个RDD对不同的父节点可能有不同的依赖方式,可能对父节点1是宽依赖,对父节点2是窄依赖。 窄依赖和宽依赖的比较如图310所示。 图310窄依赖和宽依赖的比较 3.4RDD的Python程序设计 本节利用Python语言,采用Jupyter Notebook环境开展RDD的操作。 1. 基本RDD的转换运算 1) 创建intRDD 创建intRDD最简单的方式就是使用SparkContext的parallelize方法,命令如下: intRDD=sc.parallelize([3,1,2,5,5]) 这是一个转换运算,不会立即执行。通过执行collect()的动作运算,就能够立即完成。