学习目标: ● 了解RDD,能够从不同方面介绍RDD 。 ● 掌握RDD 的创建,能够基于文件和数据集合创建RDD 。 ● 掌握RDD 的处理过程,能够使用转换算子和行动算子操作RDD 。 ● 熟悉RDD 的分区,能够指定RDD 的分区数量。 ● 熟悉RDD 的依赖关系,能够区分RDD 的窄依赖和宽依赖。 ● 掌握RDD 持久化机制,能够使用persist() 方法和cache() 方法持久化RDD 。 ● 熟悉RDD 容错机制,能够叙述RDD 的故障恢复方式。 ● 熟悉DAG 的概念,能够叙述什么是DAG 。 ● 掌握RDD 在Spark中的运行流程,能够说出RDD 被解析为Task执行的过程。 MapReduce具有负载平衡、容错性高和可拓展性强的优点,但在进行迭代计算时要频 繁进行磁盘读写操作,从而导致执行效率较低。相比之下,Spark中的RDD(Resilient DistributedDataset,弹性分布式数据集)可以有效解决这一问题。RDD 是Spark提供的重 要抽象概念,可将其理解为存储在Spark集群中的大型数据集。不同RDD 之间可以通过转 换操作建立依赖关系,并实现管道化的数据处理,避免中间结果的磁盘读写操作,从而提高 了数据处理的速度和性能。接下来,本章针对SparkRDD 进行详细讲解。 1 RDD 简介 3. RDD 是Spark中的基本数据处理模型,具有可容错性和并行的数据结构。RDD 不仅 可以将数据存储到磁盘中,还可以将数据存储到内存中。对于迭代计算产生的中间结果, RDD 可以将其保存到内存中。如果后续计算需要使用这些中间结果,可以直接从内存中读 取,提高数据计算的速度。 下面从5方面介绍RDD 。 1. 分区列表 每个RDD 会被分为多个分区,这些分区分布在集群中的不同节点上,每个分区都会被 一个计算任务处理。分区数决定了并行计算任务的数量,因此分区数的合理设置对于并行 计算性能至关重要。在创建RDD 时,可以指定RDD 分区的数量。如果没有指定分区数 量,会根据不同的情况采用默认的分区策略。例如,根据数据集合创建RDD 时,默认分区 第3章 SparkRDD弹性分布式数据集 87 数量为分配给程序的CPU 核心数;而根据HDFS上的文件创建RDD时,默认分区数量为 文件的分块数。 2.计算函数 Spark中的计算函数可以对RDD的每个分区进行迭代计算,用户可以根据具体需求自 定义RDD中每个分区的数据处理逻辑。这种灵活性使得Spark能够适应各种数据处理 场景。 3.依赖关系 RDD之间存在依赖关系,即每次对RDD进行转换操作都会生成一个新的RDD。这种 依赖关系在数据计算中发挥着重要作用。例如,如果某个分区的数据丢失,通过依赖关系, 丢失的数据可以被重新计算和恢复,从而保证了数据计算的可靠性和容错性。 4.分区器 当Spark读取的数据为键值对(key-valuepair)类型的数据时,可以通过设置分区器来 自定义数据的分区方式。Spark提供了两种类型的分区器,一种是基于哈希值的分区器 HashPartitioner,另一种是基于范围的分区器RangePartitioner。在读取的数据不是键值对 类型的情况下,分区值为None,这时Spark会采取默认的分区策略来处理这些非键值对 数据。 5.优先位置列表 优先位置列表通过存储每个分区中数据块的位置,帮助Spark优化数据处理性能。在 进行数据计算时,Spark会尽可能地将计算任务分配到其所要处理数据块的存储位置。这 种做法遵循了“移动数据不如移动计算”的理念,即在可能的情况下,将计算任务移动到数据 所在的位置,而不是将数据移动到计算任务所在的位置。通过这种方式,Spark可以减少数 据传输开销,从而提高整体计算效率。 3.2 RDD 的创建 Spark提供了两种创建RDD的方式,分别是基于文件和基于数据集合。使用基于文件 的方式创建RDD时,文件中的每行数据会被视为RDD的一个元素。使用基于数据集合的 方式创建RDD时,数据集合中的每个元素会被视为RDD的一个元素。本节针对这两种创 建RDD的方式进行详细讲解。 3.2.1 基于文件创建RDD Spark提供了textFile()方法,用于从文件系统中的文件读取数据并创建RDD,包括本 地文件系统、HDFS、AmazonS3等,其语法格式如下。 sc.textFile(path) 上述语法格式中,sc为SparkContext对象,path用于指定文件的路径。 接下来,分别演示从本地文件系统和HDFS中的文件读取数据并创建RDD。 1.从本地文件系统中的文件读取数据并创建RDD 在虚拟机Hadoop1的/export/data目录执行virdd.txt命令创建文件rdd.txt,具体内 Spark大数据分析与实战(第88 2版) 容如文件3-1所示。 文件3-1 rdd.txt 1 hadoop spark 2 itcast heima 3 scala spark 4 spark itcast 5 itcast hadoop 确保Hadoop 集群已经成功启动后,进入虚拟机Hadoop1 的目录/export/servers/ sparkOnYarn/spark-3.3.0-bin-hadoop3/启动SparkShell,在SparkShell中执行如下代码。 scala> val test = sc.textFile("file:///export/data/rdd.txt") 上述代码使用SparkContext对象sc的textFile()方法,从本地文件系统中的文件rdd.txt 读取数据创建RDD,并将RDD保存到常量test中,该常量是一个RDD对象。 上述代码执行完成后,如图3-1所示。 图3-1 从本地文件系统中的文件读取数据并创建RDD 从图3-1可以看出,Spark从本地文件系统的目录/export/data中读取文件rdd.txt的 数据,并创建一个名为test的RDD,其数据类型为String。这意味着,文件rdd.txt中的每 一行数据都会作为String类型的元素存储在RDD中。 2.从HDFS中的文件读取数据并创建RDD 将文件rdd.txt上传到HDFS的根目录。在虚拟机Hadoop1的/export/data目录执行 如下命令。 $ hdfs dfs -put rdd.txt / 从HDFS中的文件rdd.txt读取数据并创建RDD,在SparkShell中执行如下代码。 scala> val testRDD = sc.textFile("/rdd.txt") 上述代码执行完成后的效果如图3-2所示。 图3-2 从HDFS中的文件读取数据并创建RDD 第3章 SparkRDD弹性分布式数据集 89 从图3-2可以看出,Spark从HDFS的根目录中读取文件rdd.txt的数据,并创建一个 名为testRDD的RDD,其数据类型为String。 3.2.2 基于数据集合创建RDD Spark提供了parallelize()方法,用于从数据集合(数组、List集合等)读取数据并创建 RDD,其语法格式如下。 sc. parallelize(seq, numSlices) 上述语法格式中,seq用于指定数据集合。numSlices为可选,用于指定创建RDD的分 区数,该参数会在3.4节中讲解。 接下来,演示从数据集合读取数据并创建RDD,在SparkShell中执行如下代码。 scala> val numList = List[Int](1,2,3,4) scala> val listRDD = sc.parallelize(numList) 上述代码中,首先创建一个数据类型为Int的List集合numList,然后使用SparkContext 对象sc的parallelize()方法,从List集合numList中读取数据创建RDD,并将RDD保存到 常量listRDD中,该常量是一个RDD对象。 上述代码执行完成后,如图3-3所示。 图3-3 从List集合numList中读取数据创建RDD 从图3-3可以看出,Spark从List集合numList中读取数据,并创建一个名为listRDD 的RDD,其数据类型为Int。 3.3 RDD 的处理过程 RDD的处理过程主要包括转换和行动操作。下面通过图3-4 来描述RDD 的处理 过程。在 图3-4中,RDD经过一系列的转换操作,每一次转换操作都会生成一个新的RDD,直 到最后一个生成的RDD经过行动操作时,所有RDD 才会触发实际计算,并将结果返回给 驱动程序。如果某个RDD需要复用,则可以将其缓存到内存中。 Spark针对转换操作和行动操作提供了对应的算子,即转换算子和行动算子。本节针 对这两种算子进行详细讲解。 9 0 Spark大数据分析与实战(第2版) 图3- 4 RDD的处理过程 3.3.转换算子 1 转换算子用于将RDD转换为一个新的RDD,但它们不会立即执行计算。相反,它们会 构建一个执行计划,直到遇到行动算子时才会触发实际的计算。表3-1列举了一些常用的 转换算子。 表3- 1 常用的转换算子 算子语法格式说明 filter RDD.filter(func) 根据给定的函数func筛选RDD中的元素 map flatMap RDD.map(func) RDD.flatMap(func) 对RDD中的每个元素应用函数func,将其映射为一个新元素 与map算子作用相似,但是每个输入的元素都可以映射为0 或多个输出结果 groupByKey RDD.groupByKey() 用于对键值对类型的RDD中具有相同键的元素进行分组 reduceByKey RDD.reduceByKey(func) 用于对键值对类型的RDD中具有相同键的元素中的值应用 函数func进行合并 下面针对表3-1列举的常用的转换算子进行详细讲解。 1.filter算子 filter算子通过对RDD中的每个元素应用一个函数来筛选数据,只留下满足指定条件 的元素,而过滤掉不满足条件的元素。接下来,以文件3-1为例,通过一张图来描述如何通 过filer算子筛选出文件rtt中包含单词spak的元素,具体处理过程如图3-5所示。 tdd.xr 图3- 5 filter算子处理过程 在图3-5中,通过从文件rdd.xt读取数据创建RDD,然后通过fler算子将RDD的每 tit 个元素应用到函数func来筛选出包含单词spark的元素,并将其保留到新的RDD中。 接下来,以InteliJIDEA为例,演示如何使用filter算子,具体操作步骤如下。 (1)在本地计算机的D盘根目录创建文件rdd.txt,其内容与文件3-1一致。 (2)在项目Spark_rjet的目录/sci/saa下,新建一个名为cn.tat Pocr/manclics 第3章 SparkRDD弹性分布式数据集 91 .transformation的包。在cn.itcast.transformation包中创建一个名为FilterDemo的Scala 文件,用于筛选出文件rdd.txt中包含单词spark的行,具体代码如文件3-2所示。 文件3-2 FilterDemo.scala 1 package cn.itcast.transformation 2 import org.apache.spark.{SparkConf, SparkContext} 3 object FilterDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为filter,并且可以使用本地计算机的线程数为1 6 val conf:SparkConf = new SparkConf().setAppName("filter") 7 .setMaster("local[1]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\rdd.txt") 10 val result = lines.filter(x=>x.contains("spark")) 11 result.saveAsTextFile("D:\\Spark_out\\Filter_out") 12 //释放资源 13 sc.stop() 14 } 15 } 在文件3-2中,第9行代码用于从本地文件系统中读取文件rdd.txt的数据,并创建一 个名为lines的RDD。 第10行代码通过filter算子筛选出lines中包含spark的元素。在filter算子中,指定 的函数是一个匿名函数,用于依次取出lines中的每个元素并赋值给变量x,然后通过 contains()方法检查元素是否包含spark。若包含,则将该元素存放到名为result的RDD 中,否则,就过滤掉该元素。 第11行代码使用saveAsTextFile()方法将result中的元素输出到指定目录的文件中, 每个元素会占用文件的一行空间。 (3)运行文件3-2。当文件3-2运行完成后,查看本地计算机中目录D:\\Spark_out\\ Filter_out的内容,如图3-6所示。 图3-6 目录D:\\Spark_out\\Filter_out的内容 从图3-6可以看出,目录D:\\Spark_out\\Filter_out中包含4个文件,其中part-00000 是存储Spark 程序输出数据的文件。.part-00000.crc是文件part-00000 的校验和文件。 _SUCCESS是标记Spark程序运行成功的文件,该文件的内容为空。_SUCCESS.crc是文 件_SUCCESS的校验和文件。 Spark大数据分析与实战(第92 2版) (4)通过记事本查看文件part-00000的内容,如图3-7所示。 图3-7 查看文件part-00000 的内容(1) 从图3-7可以看出,文件part-00000中的每一行数据都 包含spark。说明使用filter算子成功筛选出RDD 中包含 spark的元素。 【注意】 当Spark程序能够利用本地计算机的多个线程 时,saveAsTextFile()方法可能会将RDD 中的元素输出到指 定目录的多个文件中。如果在允许Spark程序使用本地计算 机的多个线程的情况下,希望saveAsTextFile()方法将RDD 中的元素输出到指定目录的单个文件中,可以通过将RDD的 分区数指定为1来实现。关于这部分内容可以参考3.4节。 2.map算子 map算子可以将RDD中的每个元素通过一个函数映射为一个新元素。接下来,以文 件3-1为例,通过一张图来描述如何通过map算子将文件rdd.txt中的每行数据拆分为单词 后保存到数组中,具体处理过程如图3-8所示。 图3-8 map算子处理过程 在图3-8中,通过从文件rdd.txt读取数据创建RDD,然后通过map算子将RDD的每 个元素经函数func拆分为单词后保存到数组中,并将每个数组作为元素保留到新的 RDD中。 接下来,以IntelliJIDEA 为例,演示如何使用map算子。在项目Spark_Project的cn .itcast.transformation包中创建一个名为MapDemo的Scala文件,用于将文件rdd.txt中的 每行数据拆分为单词后保存到数组中,具体代码如文件3-3所示。 文件3-3 MapDemo.scala 1 package cn.itcast.transformation 2 import org.apache.spark.{SparkConf, SparkContext} 3 object MapDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为map,并且可以使用本地计算机的线程数为1 6 val conf:SparkConf = new SparkConf().setAppName("map") 7 .setMaster("local[1]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\rdd.txt") 10 val result = lines.map(x=>x.split(" ").toBuffer) 11 result.saveAsTextFile("D:\\Spark_out\\Map_out") 12 //释放资源 第3章 SparkRDD弹性分布式数据集 93 13 sc.stop() 14 } 15 } 在文件3-3中,第9行代码用于从本地文件系统中读取文件rdd.txt的数据,并创建一 个名为lines的RDD。 第10行代码通过map算子将lines中的每个元素映射为新的元素。在map算子中,指 定的函数是一个匿名函数,用于依次取出lines中的每个元素并赋值给变量x,然后通过 split()方法将每个元素按照分隔符“”(空格)拆分成单词,并将每个单词存放到数组中。为 了便于在输出结果中查看数组的内容,这里通过toBuffer()方法将数组转换为可变数组。 这些可变数组最终将存放在名为result的RDD中。 第11行代码使用saveAsTextFile()方法将result中的元素输出到指定目录的文件中, 图3-9 查看文件part-00000的内容(2) 每个元素会占用文件的一行空间。 文件3-3运行完成后,在本地计算机的目录D:\\ Spark_out\\Map_out中,通过记事本查看文件part- 00000的内容,如图3-9所示。 从图3-9可以看出,文件part-00000中的每一行 数据为可变数组的形式。可变数组中的每个元素为一 个单词。说明使用map算子成功将RDD中的每个元 素数据拆分为单词后保存到数组中。 3.flatMap算子 flatMap算子可以将RDD中的每个元素通过一个函数映射为一个或多个新元素。接 下来,以文件3-1为例,通过一张图来描述如何通过flatMap算子将文件rdd.txt中的每行 数据拆分为单词,具体处理过程如图3-10所示。 图3-10 flatMap算子的处理过程 在图3-10中,通过从文件rdd.txt读取数据创建RDD,然后通过flatMap算子将RDD 的每个元素通过函数func拆分为单词,并将每个单词作为元素保留到新的RDD中。 接下来,以IntelliJIDEA 为例,演示如何使用flatMap算子。在项目Spark_Project的 cn.itcast.transformation包中创建一个名为FlatMapDemo的Scala文件,用于将文件rdd.txt 中的每行数据拆分为单词,具体代码如文件3-4所示。 文件3-4 FlatMapDemo.scala 1 package cn.itcast.transformation 2 import org.apache.spark.{SparkConf, SparkContext} 3 object FlatMapDemo { Spark大数据分析与实战(第94 2版) 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为flatMap,并且可以使用本地计算机的线程数为1 6 val conf:SparkConf = new SparkConf().setAppName("flatMap") 7 .setMaster("local[1]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\rdd.txt") 10 val result = lines.flatMap(x=>x.split(" ")) 11 result.saveAsTextFile("D:\\Spark_out\\FlatMap_out") 12 //释放资源 13 sc.stop() 14 } 15 } 在文件3-4中,第9行代码用于从本地文件系统中读取文件rdd.txt的数据,并创建一 个名为lines的RDD。 第10行代码通过flatMap 算子将lines中的每个元素映射为多个新的元素。在 图3-11 查看文件part-00000 的内容(3) flatMap算子中,指定的函数是一个匿名函数,用于依次取 出lines中的每个元素并赋值给变量x,然后通过split()方 法将每个元素按照分隔符“”拆分成单词,并将每个单词存 放到数组中。flatMap算子会对数组进行扁平化处理,将 数组中的每个元素作为输出的一个元素存放在名为result 的RDD中。 第11行代码使用saveAsTextFile()方法将result中 的元素输出到指定目录的文件中,每个元素会占用文件的 一行空间。 文件3-4 运行完成后,在本地计算机的目录D:\\ Spark_out\\FlatMap_out中,通过记事本查看文件part- 00000的内容,如图3-11所示。 从图3-11可以看出,文件part-00000中的每一行数据为一个单词。说明使用flatMap 算子成功将RDD中的每个元素数据拆分为单词。 4.groupByKey算子 groupByKey算子可以将RDD 中具有相同键的元素划分到同一组中,返回一个新的 RDD。新的RDD中每个元素都是一个键值对,其中键是原始RDD 中的键,而值则是一个 迭代器,包含了原始RDD中具有相同键的所有值。接下来,以文件3-1为例,通过一张图来 描述如何通过groupByKey算子将文件rdd.txt中的每个单词进行分组,具体处理过程如 图3-12所示。 在图3-12中,首先,通过从文件rdd.txt读取数据创建RDD。其次,通过flatMap算子 将RDD的每个元素通过函数func拆分为单词。然后,通过map算子将RDD 的每个元素 通过函数func映射为键值对的形式,其中键为单词,值为1用于标识单词出现的次数。最 后,通过groupByKey算子将相同键的元素划分到同一组中,返回一个新的RDD。 接下来,以IntelliJIDEA 为例,演示如何使用groupByKey算子。在项目Spark_Project 的cn.itcast.transformation包中创建一个名为GroupByKeyDemo的Scala文件,用于将文 第3章 SparkRDD弹性分布式数据集 95 图3-12 groupByKey算子的处理过程 件rdd.txt中的每个单词进行分组,具体代码如文件3-5所示。 文件3-5 GroupByKeyDemo.scala 1 package cn.itcast.transformation 2 import org.apache.spark.{SparkConf, SparkContext} 3 object GroupByKeyDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为groupByKey,并且可以使用本地计算机的线程数为1 6 val conf:SparkConf = new SparkConf().setAppName("groupByKey") 7 .setMaster("local[1]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\rdd.txt") 10 val words = lines.flatMap(x=>x.split(" ")).map(y=>(y,1)) 11 val result = words.groupByKey() 12 result.saveAsTextFile("D:\\Spark_out\\GroupByKey_out") 13 //释放资源 14 sc.stop() 15 } 16 } 在文件3-5中,第9行代码用于从本地文件系统中读取文件rdd.txt的数据,并创建一 个名为lines的RDD。 第10行代码,首先flatMap算子将lines的每个元素通过匿名函数拆分为单词。然后, map算子将每个单词通过匿名函数映射为键值对的形式,将每个键值对作为输出的一个元 素存放在名为words的RDD中。 第11行代码,通过groupByKey算子将words中相同键的元素划分到同一组,生成名 为result的RDD。 第12行代码使用saveAsTextFile()方法将result中的元素输出到指定目录的文件中, 每个元素会占用文件的一行空间。 文件3-5运行完成后,在本地计算机的目录D:\\Spark_out\\GroupByKey_out中,通 过记事本查看文件part-00000的内容,如图3-13所示。 Spark大数据分析与实战(第96 2版) 图3-13 查看文件part-00000的内容(4) 从图3-13可以看出,文件part-00000中的每一 行数据为键值对的形式,其中键为单词,值为迭代器, 迭代器中1的数量,决定了相应单词出现的次数。说 明使用groupByKey算子成功将RDD中相同键的元 素划分到同一组。 5.reduceByKey算子 reduceByKey算子可以将RDD中具有相同键的 元素中的值通过指定函数进行合并,返回一个新的 RDD。新的RDD中每个元素都是一个键值对,每个元素的键对应的值都是经过合并的结 果。接下来,以文件3-1为例,通过一张图来描述如何通过reduceByKey算子统计文件rdd.txt 中每个单词出现的次数,具体处理过程如图3-14所示。 图3-14 reduceByKey算子的处理过程 在图3-14中,首先,通过从文件rdd.txt读取数据创建RDD。其次,通过flatMap算子 将RDD的每个元素通过函数func拆分为单词。然后,通过map算子将RDD 的每个元素 通过函数func映射为键值对的形式,其中键为单词,值为1用于标识单词出现的次数。最 后,通过reduceByKey算子将相同键的元素中的值进行合并,返回一个新的RDD。 接下来,以IntelliJIDEA 为例,演示如何使用reduceByKey 算子。在项目Spark_ Project的cn.itcast.transformation包中创建一个名为ReduceByKeyDemo的Scala文件,用 于统计文件rdd.txt中每个单词出现的次数,具体代码如文件3-6所示。 文件3-6 ReduceByKeyDemo.scala 1 package cn.itcast.transformation 2 import org.apache.spark.{SparkConf, SparkContext} 3 object ReduceByKeyDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为reduceByKey,并且可以使用本地计算机的线程数为1 6 val conf:SparkConf = new SparkConf().setAppName("reduceByKey") 7 .setMaster("local[1]") 8 val sc:SparkContext = new SparkContext(conf) 第3章 SparkRDD弹性分布式数据集 97 9 val lines = sc.textFile("D:\\rdd.txt") 10 val words = lines.flatMap(x=>x.split(" ")).map(y=>(y,1)) 11 val result = words.reduceByKey((a,b)=>a+b) 12 result.saveAsTextFile("D:\\Spark_out\\ReduceByKey_out") 13 //释放资源 14 sc.stop() 15 } 16 } 在文件3-6中,第9行代码用于从本地文件系统中读取文件rdd.txt的数据,并创建一 个名为lines的RDD。 第10行代码,首先flatMap算子将lines的每个元素通过匿名函数拆分为单词。然后, map算子将每个单词通过匿名函数映射为键值对的形式,将每个键值对作为输出的一个元 素存放在名为words的RDD中。 第11行代码,通过reduceByKey算子将words中具有相同键的元素中的值进行合并。 在reduceByKey算子中,指定的函数是一个匿名函数,用于对相同键的元素中的值进行相 图3-15 查看文件part-00000的内容(5) 加,返回一个名为result的RDD。 第12 行代码使用saveAsTextFile()方法将 result中的元素输出到指定目录的文件中,每个元素 会占用文件的一行空间。 文件3-6运行完成后,在本地计算机的目录D:\\ Spark_out\\ReduceByKey_out中,通过记事本查看 文件part-00000的内容,如图3-15所示。 从图3-15可以看出,文件part-00000中的每一 行数据为键值对的形式,其中键为单词,值为单词出现的次数。说明使用reduceByKey算 子成功将RDD中具有相同键的元素中的值进行合并。 3.3.2 行动算子 行动算子用于触发RDD的实际计算,并将计算结果返回给驱动器程序或者写入外部 存储系统。与转换算子不同,行动算子并不会创建新的RDD。接下来,通过表3-2来列举 一些常用的行动算子。 表3-2 常用的行动算子 算 子语法格式说 明 count RDD.count() 获取RDD中元素的数量 first RDD.first() 获取RDD中的第一个元素 take RDD.take(n) 以数组的形式返回RDD中的前n 个元素 reduce RDD.reduce(func) 使用指定的函数func对RDD中的元素进行聚合操作 collect RDD.collect() 以数组的形式返回RDD中的所有元素 foreach RDD.foreach(func) 对RDD中的每个元素应用指定的函数func Spark大数据分析与实战(第98 2版) 下面演示如何使用表3-2中列举的常用行动算子。 1.count算子 以IntelliJIDEA 为例,演示如何使用count算子,具体操作步骤如下。 (1)在本地计算机D盘的根目录下创建文件num.txt,具体内容如下。 12345 (2)在项目Spark_Project的目录/src/main/scala下,新建一个名为cn.itcast.action的 包。在cn.itcast.action包中创建一个名为CountDemo的Scala文件,用于通过count算子 统计文件num.txt的行数,具体代码如文件3-7所示。 文件3-7 CountDemo.scala 1 package cn.itcast.action 2 import org.apache.spark.{SparkConf, SparkContext} 3 object CountDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为count,并且可以使用本地计算机的所有线程 6 val conf:SparkConf = new SparkConf().setAppName("count") 7 .setMaster("local[*]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\num.txt") 10 println(lines.count()) 11 //释放资源 12 sc.stop() 13 } 14 } 在文件3-7中,第9行代码用于从本地文件系统中读取文件num.txt的数据,并创建一 个名为lines的RDD。第10行代码通过count算子获取lines中元素的数量,并将其输出到 控制台。 文件3-7的运行结果如图3-16所示。 图3-16 文件3-7的运行结果 从图3-16可以看出,lines中元素的数量为5。由于lines中元素的数量与文件num.txt 的行数一致,因此可以推断出文件num.txt有5行数据。 2.first算子 以IntelliJIDEA 为例,演示如何使用first算子。在项目Spark_Project的cn.itcast .action包中创建一个名为FirstDemo的Scala文件,用于通过first算子获取文件num.txt 的第一行数据,具体代码如文件3-8所示。 第3章 SparkRDD弹性分布式数据集 99 文件3-8 FirstDemo.scala 1 package cn.itcast.action 2 import org.apache.spark.{SparkConf, SparkContext} 3 object FirstDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为first,并且可以使用本地计算机的所有线程 6 val conf:SparkConf = new SparkConf().setAppName("first") 7 .setMaster("local[*]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\num.txt") 10 println(lines.first()) 11 //释放资源 12 sc.stop() 13 } 14 } 在文件3-8中,第9行代码用于从本地文件系统中读取文件num.txt的数据,并创建一 个名为lines的RDD。第10行代码通过first算子获取lines的第一个元素,并将其输出到 控制台。 文件3-8的运行结果如图3-17所示。 图3-17 文件3-8的运行结果 从图3-17可以看出,lines的第一个元素为1。由于lines中的元素与文件num.txt中 的数据一致,所以可以推断出文件num.txt的第一行数据为1。 3.take算子 以IntelliJIDEA 为例,演示如何使用take算子。在项目Spark_Project的cn.itcast. action包中创建一个名为TakeDemo的Scala文件,用于通过take算子获取文件num.txt 的前3行数据,具体代码如文件3-9所示。 文件3-9 TakeDemo.scala 1 package cn.itcast.action 2 import org.apache.spark.{SparkConf, SparkContext} 3 object TakeDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为take,并且可以使用本地计算机的所有线程 6 val conf:SparkConf = new SparkConf().setAppName("take") 7 .setMaster("local[*]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\num.txt") 10 println(lines.take(3).mkString("\n")) 11 //释放资源 12 sc.stop() 13 } 14 } Spark大数据分析与实战(第1 00 2版) 在文件3-9中,第9行代码用于从本地文件系统中读取文件num.txt的数据,并创建一 个名为lines的RDD。第10行代码通过take算子获取lines的前3个元素,并将其输出到 控制台。由于take算子是以数组的形式返回RDD 中的前n 个元素,所以为了方便查看结 果,使用mkString()方法将数组中的元素通过指定分隔符组合成字符串。 文件3-9的运行结果如图3-18所示。 图3-18 文件3-9的运行结果 从图3-18可以看出,lines的前3个元素分别为1、2和3。由于lines中的元素与文件 num.txt中的数据一致,所以可以推断出文件num.txt前3行数据分别为1、2和3。 4.reduce算子 以IntelliJIDEA 为例,演示如何使用reduce算子。在项目Spark_Project的cn.itcast .action包中创建一个名为ReduceDemo的Scala文件,用于通过reduce算子对文件num. txt中的数据进行相加的聚合运算,具体代码如文件3-10所示。 文件3-10 ReduceDemo.scala 1 package cn.itcast.action 2 import org.apache.spark.{SparkConf, SparkContext} 3 object ReduceDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为reduce,并且可以使用本地计算机的所有线程 6 val conf:SparkConf = new SparkConf().setAppName("reduce") 7 .setMaster("local[*]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\num.txt").map(x=>(x.toInt)) 10 println(lines.reduce((a,b)=>a+b)) 11 //释放资源 12 sc.stop() 13 } 14 } 在文件3-10中,第9行代码首先从本地文件系统中读取文件num.txt的数据,然后通 过map算子将元素的数据类型转换为Int,并创建一个名为lines的RDD。第10行代码通 过reduce算子对lines中的元素进行相加的聚合运算,并将运算结果输出到控制台。 文件3-10的运行结果如图3-19所示。 从图3-19可以看出,lines中元素的相加结果为15。由于lines中的元素与文件num.txt 中的数据一致,所以可以推断出文件num.txt中数据的相加结果为15。 5.collect算子 以IntelliJIDEA 为例,演示如何使用collect算子。在项目Spark_Project的cn.itcast .action包中创建一个名为CollectDemo的Scala文件,用于通过collect算子获取文件num.txt 第3章 SparkRDD弹性分布式数据集1 01 图3-19 文件3-10的运行结果 中的所有数据,具体代码如文件3-11所示。 文件3-11 CollectDemo.scala 1 package cn.itcast.action 2 import org.apache.spark.{SparkConf, SparkContext} 3 object CollectDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为collect,并且可以使用本地计算机的所有线程 6 val conf:SparkConf = new SparkConf().setAppName("collect") 7 .setMaster("local[*]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\num.txt") 10 println(lines.collect().mkString("\n")) 11 //释放资源 12 sc.stop() 13 } 14 } 在文件3-11中,第9行代码用于从本地文件系统中读取文件num.txt的数据,并创建 一个名为lines的RDD。第10行代码通过collect算子获取lines的所有元素,并将其输出 到控制台。由于collect算子是以数组的形式返回RDD中的所有元素,所以为了方便查看 结果,使用mkString()方法将数组中的元素通过指定分隔符组合成字符串。 文件3-11的运行结果如图3-20所示。 图3-20 文件3-11的运行结果 从图3-20可以看出,lines的所有元素分别为1、2、3、4和5。由于lines中的元素与文 件num.txt中的数据一致,所以可以推断出文件num.txt的所有数据分别为1、2、3、4和5。 6.foreach算子 以IntelliJIDEA 为例,演示如何使用foreach算子。在项目Spark_Project的cn.itcast .action包中创建一个名为ForeachDemo的Scala文件,用于通过foreach 算子获取文件 num.txt中的所有数据,具体代码如文件3-12所示。 Spark大数据分析与实战(第1 02 2版) 文件3-12 ForeachDemo.scala 1 package cn.itcast.action 2 import org.apache.spark.{SparkConf, SparkContext} 3 object ForeachDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为foreach,并且可以使用本地计算机的线程数为1 6 val conf:SparkConf = new SparkConf().setAppName("foreach") 7 .setMaster("local[1]") 8 val sc:SparkContext = new SparkContext(conf) 9 val lines = sc.textFile("D:\\num.txt") 10 lines.foreach(x=>println(x)) 11 //释放资源 12 sc.stop() 13 } 14 } 图3-21 文件3-12的运行结果 在文件3-12中,第9行代码用于从本地文件 系统中读取文件num.txt的数据,并创建一个名 为lines的RDD。第10行代码通过foreach算子 将lines中的每个元素应用指定的匿名函数,该匿 名函数用于依次取出lines中的每个元素并赋值 给变量x,然后通过println()方法将元素输出到 控制台。 文件3-12的运行结果如图3-21所示。 从图3-21可以看出,lines的所有元素分别为1、2、3、4和5。由于lines中的元素与文 件num.txt中的数据一致,所以可以推断出文件num.txt所有数据分别为1、2、3、4和5。 【注意】 当Spark程序能够利用本地计算机的多个线程时,foreach算子会在每个分区 上并行执行指定的函数,因此RDD 中的每个元素不会按顺序依次应用函数。如果在允许 Spark程序使用本地计算机的多个线程的情况下,希望让RDD中的每个元素按顺序依次应 用foreach算子中指定的函数,可以通过指定RDD的分区数为1来实现。关于这部分内容 可以参考3.4节。 3.4 RDD 的分区 在分布式计算中,网络通信开销是一个关键的性能瓶颈。因此,合理控制数据分布以减 少网络传输对整体性能的影响至关重要。在编写Spark程序时,用户可以通过parallelize() 方法、repartition()方法和coalesce()方法手动指定RDD 的分区数量来精确地控制数据的 分布。关于这3个方法的介绍如下。 ● parallelize()方法用于在创建RDD时指定分区数量。 ● repartition()方法用于增加或减少RDD 的分区数量,它会触发重分区操作,从而生 成新的RDD。 ● coalesce()方法通常用于减少RDD的分区数量,它也会触发重分区操作,从而生成新 的RDD。 第3章 SparkRDD弹性分布式数据集1 03 repartition()方法和coalesce()方法都用于减少RDD 分区的数量,但它们的行为有所 不同。repartition()方法会触发一个Shuffle过程,即数据会通过网络传输重新洗牌,以满 足新的分区需求。与此不同,coalesce()方法不会触发Shuffle过程,它只是将原始分区中的 数据合并到新的分区中,尽量保持数据的原始分布。在处理大规模数据集时,Shuffle过程 可能会消耗大量的网络带宽和计算资源。因此,使用coalesce()方法减少RDD分区的数量 时,性能开销相对较小。但在某些情况下,coalesce()方法可能会导致数据分布不均匀。 接下来,以IntelliJIDEA 为例,演示如何指定RDD的分区数量。在项目Spark_Project 的目录/src/main/scala下,新建一个名为cn.itcast.partition的包。在cn.itcast.partition包 中创建一个名为PartitionDemo的Scala文件,具体代码如文件3-13所示。 文件3-13 PartitionDemo.scala 1 package cn.itcast.partition 2 import org.apache.spark.{SparkConf, SparkContext} 3 object PartitionDemo { 4 def main(args: Array[String]): Unit = { 5 //指定Spark 程序的名称为partition,并且可以使用本地计算机的所有线程 6 val conf:SparkConf = new SparkConf().setAppName("partition") 7 .setMaster("local[*]") 8 val sc = new SparkContext(conf) 9 val arr = Array(1,2,3,4,5) 10 //创建名为data1 的RDD,并指定RDD 的分区为3 11 val data1 = sc.parallelize(arr,3) 12 println("data1 的分区数为:" + data1.getNumPartitions) 13 //将data1 的分区数修改为5,生成名为data2 的RDD 14 val data2 = data1.repartition(5) 15 println("data2 的分区数为:" + data2.getNumPartitions) 16 //将data2 的分区数修改为4,生成名为data3 的RDD 17 val data3 = data2.coalesce(4) 18 println("data3 的分区数为:" + data3.getNumPartitions) 19 } 20 } 图3-22 文件3-13的运行结果 在文件3-13中,getNumPartitions()方法用 于获取RDD的分区数。 文件3-13的运行结果如图3-22所示。 从图3-22可以看出,data1、data2和data3的 分区数分别为3、5和4。说明成功在创建RDD时 指定分区数量,以及修改RDD的分区数量。 3.5 RDD 的依赖关系 在Spark中,不同的RDD之间可能存在依赖关系,这种依赖关系分为两种,分别是窄依 赖(NarrowDependency)和宽依赖(WideDependency),具体介绍如下。 1.窄依赖 窄依赖是指父RDD 的每一个分区最多被一个子RDD 的分区使用。在Spark中,父 104 Spark大数据分析与实战(第2版) RDD 指的是生成当前RDD 的原始RDD 或者转换操作之前的RDD 。而子RDD 则是由当 前RDD 生成的RDD 或者转换操作之后的RDD 。 窄依赖的表现形式通常分为两类:第一类表现为一个父RDD 的分区对应一个子RDD 的分区;第二类表现为多个父RDD 的分区对应一个子RDD 的分区。但是,一个父RDD 的 分区不会对应多个子RDD 的分区。为了便于理解,通常把窄依赖形象地比喻为独生子女 继承家产。接下来,通过图3-23 来展示常见的窄依赖及其对应的操作。 图3-23 窄依赖 从图3-23 可以看出,RDD 在进行map算子和union算子操作时,属于窄依赖的第一类 表现;而RDD 进行join算子操作时,属于窄依赖的第二类表现。 2. 宽依赖 宽依赖是指子RDD 的每个分区都会使用父RDD 的全部或多个分区。为了更直观理 解这个概念,可以把宽依赖形象地比喻为兄弟姐妹共同继承家产。接下来,通过图3-24 来 展示常见的宽依赖及其对应的操作。 图3-24 宽依赖 从图3-24 可以看出,RDD 在进行groupByKey算子和join算子操作时为宽依赖。 需要注意的是,oin算子操作既可以属于窄依赖,也可以属于宽依赖。当join算子操作 j 后,如果子RDD 的分区数与父RDD 相同则为窄依赖;当join算子操作后,如果子RDD 的 分区数与父RDD 不同则为宽依赖。 第3章SparkRDD 弹性分布式数据集105 6 RDD 机制 3. Spark为RDD 提供了两个重要的机制,分别是持久化机制和容错机制。接下来,本 节 针对持久化机制和容错机制进行详细介绍 。 3..持久化机制 61 持久化机制,也称为缓存机制,用于将RDD 缓存在内存或磁盘上,以备后续重用。在 Spark中,由于RDD 采用惰性求值的方式,意味着RDD 的转换操作不会立即执行计算。只 有在遇到行动操作时,Spark才会根据RDD 之间的依赖关系,触发转换操作执行计算。在 存在多个行动算子的情况下,每个行动算子都可能导致转换操作的重复计算。为了避免这 种资源开销,可通过持久化机制将重复使用的RDD 缓存到内存或磁盘中,从而避免重复计 算,提高计算效率。 通常情况下,一个RDD 由多个分区组成,数据分布在多个节点中。因此,当对某个 RDD 进行持久化时,每个节点都会将其分区的计算结果缓存在内存或磁盘中。在对RDD 或其衍生出的RDD 执行行动操作时,无须重新计算,而是直接获取各个分区的计算结果, 从而极大提高后续行动操作的速度。RDD 的持久化机制是Spark构建迭代式算法和快速 交互式查询的关键,因为它可以避免多次使用的RDD 导致转换操作的重复计算所产生的 资源开销。 在编写Spark程序时,用户可以通过cache() 方法和persist() 方法持久化RDD,其中 cache() 方法使用RDD 默认的持久化级别,将RDD 缓存到内存中。而persist() 方法可以通 过传递的参数指定持久化级别,将RDD 缓存到内存或磁盘中。接下来,通过表3-3介绍 RDD 的持久化级别。 表3- 3 RDD 的持久化级别 持久化级别说明 MEMORY_ONLY RDD 默认的持久化级别。将RDD 缓存在JVM 堆内存中。若RDD 无法完全缓存在内存中,则某些分区将不会被缓存。可能会导致性 能下降,因为需要重新计算丢失的分区 MEMORY_AND_DISK 将RDD 缓存在JVM 堆内存中。若RDD 无法完全缓存在内存中,则 某些分区将存储在磁盘上,并在需要时从磁盘读取 MEMORY_ONLY_SER 类似于MEMORY_ONLY,但是使用序列化的方式将RDD 缓存在 JVM 堆内存中。相比于MEMORY_ONLY,使用序列化可以减少内 存消耗,并提高缓存的效率 MEMORY_AND_DISK_SER 类似于MEMORY_AND_DISK 。但是使用序列化的方式将RDD 缓 存在JVM 堆内存中 DISK_ONLY 仅将RDD 缓存在磁盘中 MEMORY_ONLY_2、MEMORY_ AND_DISK_2 OFF_HEAP 分别与MEMORY_ONLY 和MEMORY_AND_DISK 类似,不同的 是加上后缀_2,表示RDD 的每个分区都会缓存2个副本 将RDD 缓存到堆外内存中