第 3 章 SparkRDD编程 RDD是Spark的核心概念。Spark基于Python语言提供了对RDD的转 换操作和行动操作,通过这些操作可实现复杂的应用。本章主要介绍RDD创 建的方式、RDD转换操作、RDD行动操作、RDD之间的依赖关系和RDD的持 —kRDD实现词频统计。 久化,最后给出案例实战——利用Spar 1 RDD 的创建方式 3. 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是 RDD 的 其最大的缺点是在迭代计算式的时候要进行大量的磁盘I/O操作,而RDD正创建方式 是为解决这一缺点而出现的。 Spark数据处理引擎SparkCore是建立在统一的抽象弹性分布式数据集 (RDD)之上的,这使得Spark的SparkStreaming、SparkSQL 、SparkMLlib、 SparkGraphX等应用组件可以无缝地进行集成,能够在同一个应用程序中完成 大数据处理。RDD是Spark对具体数据对象的一种抽象(封装),本质上是一个 只读的分区(partition)记录集合,每个分区就是一个数据集片段,每个分区对应 一个任务。一个RDD的不同分区可以保存到集群中的不同节点上,对RDD进 行操作,相当于对RDD的每个分区进行操作。RDD中的数据对象可以是 Python、Java、Scala中任意类型的对象,甚至是用户自定义的对象。Spark中的 所有操作都是基于RDD进行的,一个Spark应用可以看作一个由RDD的创建 到一系列RDD转化操作再到RDD存储的过程。图3-1展示了RDD的分区及 分区与工作节点(workernode)的分布关系,其中的RDD被切分成4个分区。 RDD最重要的特性是容错性。如果RDD某个节点上的分区因为节点故 障 导致数据丢了,那么RDD会自动通过自己的数据来源重新计算得到该分区, 这 一切对用户是透明的 。 创建RDD有两种方式:通过Spark应用程序中的数据集创建;使用本地及 HDFS 、HBase等外部存储系统上的文件创建。 下面讲解创建RDD的常用方式。 3.1 使用程序中的数据集创建RDD 1. 可通过调用SparkContext对象的paralelize()方法并行化程序中的数据集 64 Spark大数据分析技术(Python版·微课版) 图3-1 RDD 的分区及分区与工作节点的分布关系 合以创建RDD。可以序列化Python对象得到RDD。例如: >>> arr = [1, 2, 3, 4, 5, 6] >>> rdd = sc.parallelize(arr) #把arr 这个数据集并行化到节点上以创建RDD >>> rdd1 = sc.parallelize([('a', 7), ('a', 2), ('b', 2)]) >>> rdd2 = sc.parallelize(range(100)) >>> rdd3 = sc.parallelize([('a', [1, 2, 3]), ('b', [4, 5, 6])]) >>> rdd.collect() #以列表形式返回RDD 中的所有元素 [1, 2, 3, 4, 5, 6] >>> rdd3.collect() [('a', [1, 2, 3]), ('b', [4, 5, 6])] 在上述语句中,使用了Spark提供的SparkContext对象,名称为sc,这是PySpark启 动的时候自动创建的,在交互式编程环境中可以直接使用。如果编写脚本程序文件,则在 程序文件中通过如下语句创建sc: from pyspark import SparkConf, SparkContext conf = SparkConf( ).setAppName("Spark Demo").setMaster("local") sc = SparkContext(conf = conf) 任何Spark程序都是从SparkContext开始的,SparkContext的初始化需要一个 SparkConf对象,SparkConf包含了Spark集群配置的各种参数。创建SparkContext对 象后,就可以使用SparkContext对象所包含的各种方法创建和操作RDD。 实际上,RDD也是一个数据集合。与Python的list(列表)对象不同的是,RDD的数 据可能分布于多台计算机上。 在调用parallelize()方法时,可以设置一个参数指定将一个数据集合切分成多少个分 区,例如,parallelize(arr,3)指定RDD的分区数是3。Spark会为每一个分区运行一个任 务,对其进行处理。Spark默认会根据集群的情况设置分区的数量。当调用parallelize() 方法时,若不指定分区数,则使用系统给出的分区数。例如: >>> rdd4 = sc.parallelize([1, 2, 3, 4, 5, 6], 3) 第3章 SparkRDD 编程 65 >>> rdd4.getNumPartitions() #获取rdd4 的分区数 3 RDD对象的glom()方法分别将RDD对象的每个分区上的元素分别放入一个列表 中,返回一个由这些列表组成的新RDD。例如: >>> rdd4.glom().collect() [[1, 2], [3, 4], [5, 6]] 3.1.2 使用文本文件创建RDD Spark可以使用任何Hadoop支持的存储系统上的文件(如HDFS、HBase以及本地 文件)创建RDD。调用SparkContext对象的textFile()方法读取文件的位置,即可创建 RDD。textFile()方法支持针对目录、文本文件、压缩文件以及通配符匹配的文件进行 RDD的创建。 Spark支持的常见文件格式如表3-1所示。 表3-1 Spark支持的常见文件格式 文件格式数据类型描 述 文本文件非结构化普通的文本文件,每行一条记录 JSON 半结构化常见的基于文本的格式 CSV 结构化 常见的基于文本的格式,通常应用在电子表格中 SequenceFile 结构化 用于键值对数据的常见Hadoop文件格式 对象文件结构化 用来存储Spark作业中的数据,给共享的代码读取 1.读取HDFS中的文本文件创建RDD 在HDFS中有一个文件名为/user/hadoop/input/data.txt,其内容如下: Business before pleasure. Nothing is impossible to a willing heart. I feel strongly that I can make it. 在读取该文件创建RDD之前,需要先启动Hadoop系统,命令如下: $ cd /usr/local/hadoop $ ./sbin/start-dfs.sh #启动Hadoop #读取HDFS 上的文件创建RDD >>> rdd = sc.textFile("/user/hadoop/input/data.txt") >>> rdd.foreach(print) #输出rdd 中的每个元素 Business before pleasure. 66 Spark大数据分析技术(Python版·微课版) Nothing is impossible to a willing heart. I feel strongly that I can make it. >>> rdd.keys().collect() #获取rdd 的key ['B', 'N', 'I'] 执行rdd = sc.textFile("/user/hadoop/input/data.txt")语句后,Spark从data.txt 文件中加载数据到内存中,在内存中生成一个RDD对象rdd。这个rdd里面包含了若干 元素,元素的类型是字符串,从data.txt文件中读取的每一行文本内容都成为rdd中的一 个元素。 使用textFile()方法读取文件创建RDD时,可指定分区的个数。例如: >>> rdd = sc.textFile("/user/hadoop/input/data.txt", 3) #创建包含3 个分区的RDD 对象 2.读取本地的文本文件创建RDD 读取Linux本地文件也是通过sc.textFile("路径")方法实现的,但需要在路径前面 加上“file:”以表示从Linux本地文件系统读取。在Linux本地文件系统上存在一个文 件/home/hadoop/data.txt,其内容和上面的HDFS中的文件/user/hadoop/input/data. txt完全一样。 下面给出读取Linux本地的/home/hadoop/data.txt文件创建一个RDD的例子: >>> rdd1 = sc.textFile("file:/home/hadoop/data.txt") #读取本地文件 >>> rdd1.foreach(print) #输出rdd1 中的每个元素 Business before pleasure. Nothing is impossible to a willing heart. I feel strongly that I can make it. 3.读取目录创建RDD textFile()方法也可以读取目录。将目录作为参数,会将目录中的各个文件中的数据 都读入RDD中。/home/hadoop/input目录中有文件text1.txt和text2.txt,text1.txt中 的内容为“HelloSpark”,text2.txt中的内容为“HelloPython”。 >>> rddw1 = sc.textFile("file:/home/hadoop/input") #读取本地文件夹 >>> rddw1.collect() ['Hello Python', 'Hello Spark'] 4.使用wholeTextFiles()方法读取目录创建RDD SparkContext对象的wholeTextFiles()方法也可用来读取给定目录中的所有文件, 可在输入路径时使用通配符(如part-*.txt)。wholeTextFiles()方法会返回若干键值对 第3章 SparkRDD 编程 67 组成的RDD,每个键值对的键是目录中一个文件的文件名,值是该文件名所表示的文件 的内容。 >>> rddw2 = sc.wholeTextFiles ("file:/home/hadoop/input") #读取本地目录 >>> rddw2.collect() [('file:/home/hadoop/input/text2.txt', 'Hello Python\n'), ('file:/home/hadoop/input/text1.txt', 'Hello Spark\n')] 3.1.3 使用JSON 文件创建RDD JSON(JavaScriptObjectNotation,JavaScript对象标记)是一种轻量级的数据交换 格式,JSON 文件在许多编程API中都得到支持。简单地说,JSON 可以将JavaScript对 象表示的一组数据转换为字符串,然后就可以在网络或者程序之间轻松地传递这个字符 串,并在需要的时候将它还原为各编程语言所支持的数据格式,是互联网上最受欢迎的数 据交换格式。 在JSON 语言中,一切皆对象。任何支持的类型都可以通过JSON 表示,例如字符 串、数字、对象、数组等。但是对象和数组是比较特殊且常用的两种类型。 对象在JSON 中是用“{}”括起来的内容,采用{key1:value1,key2:value2,…}这样的 键值对结构。在面向对象的语言中,key为对象的属性,value为对应的值。键名可以用 整数和字符串表示,值可以是任意类型。 数组在JSON 中是用“[]”括起来的内容,例如["Java","Python","VB",…]。数组 是一种比较特殊的数据类型,数组内也可以像对象那样使用键值对。 JSON 格式的5条规则如下: (1)并列的数据之间用“,”分隔。 (2)映射(键值对)用“:”表示。 (3)并列数据的集合(数组)用“[]”表示。 (4)映射(键值对)的集合(对象)用“{}”表示。 (5)元素值可具有的类型为string、number、object(对象)、array(数组),元素值也可 以是true、false、null。 在Windows系统中,可以使用记事本或其他类型的文本编辑器打开JSON 文件以查 看内容;在Linux系统中,可以使用vim 编辑器打开和查看JSON 文件。 例如,表示中国部分省市的JSON 数据如下: { "name": "中国", "province": [{ "name": "河南", "cities": { "city": ["郑州", "洛阳"] } 68 Spark大数据分析技术(Python版·微课版) }, { "name": "广东", "cities": { "city": ["广州", "深圳"] } }, { "name": "陕西", "cities": { "city": ["西安", "咸阳"] } }] } 下面再给出一个JSON 文件示例数据: { "code": 0, "msg": "", "count": 2, "data": [ { "id": "101", "username": "ZhangSan", "city":"XiaMen", }, { "id": "102", "username": "LiMing", "city": "ZhengZhou", }] } 创建JSON 文件的一种方法是:新建一个扩展名为.txt的文本文件,在文件中写入 JSON 数据,保存该文件,将扩展名修改成.json,就成为JSON 文件了。 在本地文件系统/home/hadoop/目录下有一个student.json文件,内容如下: {"学号":"106","姓名":"李明","数据结构":"92"} {"学号":"242","姓名":"李乐","数据结构":"96"} {"学号":"107","姓名":"冯涛","数据结构":"84"} 从文件内容可看到每个“{…}”中为一个JSON 格式的数据,一个JSON 文件包含若 干JSON 格式的数据。 读取JSON 文件创建RDD最简单的方法是将JSON 文件作为文本文件读取。例如: 第3章 SparkRDD 编程 69 >>> jsonStr = sc.textFile("file:/home/hadoop/student.json") >>> jsonStr.collect() ['{"学号":"106","姓名":"李明","数据结构":"92"}', '{"学号":"242","姓名":"李乐", "数据结构":"96"}', '{"学号":"107","姓名":"冯涛","数据结构":"84"}'] 3.1.4 使用CSV 文件创建RDD CSV(CommaSeparatedValues,逗号分隔值)文件是一种用来存储表格数据(数字和 文本)的纯文本格式文件。CSV 文件的内容由以“,”分隔的一列列数据构成,它可以被导 入各种电子表格和数据库中。纯文本意味着该文件是一个字符序列。在CSV 文件中,列 之间以逗号分隔。CSV 文件由任意数目的记录组成,记录间以某种换行符分隔,一行为 一条记录。可使用Word、Excel、记事本等方式打开CSV 文件。 创建CSV 文件的方法有很多,最常用的方法是用电子表格创建。例如,在Excel中, 选择“文件”→“另存为”命令,然后在“文件类型”下拉列表框中选择“CSV (逗号分隔) (*.csv)”,最后单击“保存”按钮,即创建了一个CSV 文件。 如果CSV 文件的所有数据字段均不包含换行符,可以使用textFile()方法读取并解 析数据。 例如,/home/hadoop目录下保存了一个名为grade.csv的CSV 文件,文件内容如下: 101,LiNing,95 102,LiuTao,90 103,WangFei,96 使用textFile()方法读取grade.csv文件,创建RDD: >>> gradeRDD = sc.textFile("file:/home/hadoop/grade.csv") #创建RDD >>> gradeRDD.collect() ['101,LiNing,95', '102,LiuTao,90', '103,WangFei,96'] 3.2 RDD 转换操作 从相关数据源获取数据形成初始RDD 后,根据应用需求,调用RDD 对象的转换操 作(算子)方法对得到的初始RDD进行操作,生成一个新的RDD。对RDD的操作分为两 大类型:转换操作和行动操作。Spark里的计算就是操作RDD。 转换操作负责对RDD中的数据进行计算并转换为新的RDD。RDD 转换操作是惰 性求值的,只记录转换的轨迹,而不会立即转换,直到遇到行动操作时才会与行动操作一 起执行。 下面给出RDD对象的常用转换操作方法。 70 Spark大数据分析技术(Python版·微课版) 映射操作 3.2.1 映射操作 映射操作方法主要有map()、flatMap()、mapValues()、flatMapValues()和 mapPartitions()。 1.map() map(func)对一个RDD中的每个元素执行func函数,通过计算得到新元素,这些新 元素组成的RDD作为map(func)的返回结果。例如: >>> rdd1 = sc.parallelize([1, 2, 3, 4]) > >> result=rdd1.map(lambda x:x*2) #用map()对rdd1 中的每个数进行乘2 操作 >>> result.collect() #以列表形式返回RDD 中的所有元素 [2, 4, 6, 8] 上述代码中,向map()操作传入了一个匿名函数lambdax:x*2。其中,x为函数的 参数名称,也可以使用其他字符,如y;x*2为函数解析式,用来实现函数的运算。Spark 会将RDD中的每个元素依次传入该函数的参数中,返回一个由所有函数值组成的 新RDD。 collect()为行动操作,将生成的RDD 对象result转化为list类型,同时可实现查看 RDD中数据的效果。 map(func)可用来将一个普通的RDD转换为一个键值对形式的RDD,供只能操作键 值对类型的RDD使用。 例如,对一个由英语单词组成的文本行,提取其中的第一个单词作为key,将整个句 子作为value,建立键值对RDD,具体实现如下: >>> wordsRDD = sc.parallelize(["Who is that", "What are you doing", "Here you are"]) >>> PairRDD = wordsRDD.map(lambda x: (x.split(" ")[0], x)) >>> PairRDD.collect() [('Who', 'Who is that'), ('What', 'What are you doing'), ('Here', 'Here you are')] 2.flatMap() flatMap(func)类似于map(func),但又有所不同。flatMap(func)中的func函数会 返回0个或多个元素,flatMap(func)将func函数返回的元素合并成一个RDD,作为本操 作的返回值。例如: >>> wordsRDD = sc.parallelize(["Who is that", "What are you doing", "Here you are"]) >>> FlatRDD = wordsRDD.flatMap(lambda x: x.split(" ")) >>> FlatRDD.collect() ['Who', 'is', 'that', 'What', 'are', 'you', 'doing', 'Here', 'you', 'are'] 第3章 SparkRDD 编程 71 flatMap()的一个简单用途是把输入的字符串切分为单词。例如: #定义函数 >>> def tokenize(ws): return ws.split(" ") >>> lines = sc.parallelize(["One today is worth two tomorrows","Better late than never","Nothing is impossible for a willing heart"]) >>> lines.map(tokenize).foreach(print) ['One', 'today', 'is', 'worth', 'two', 'tomorrows'] ['Better', 'late', 'than', 'never'] ['Nothing', 'is', 'impossible', 'for', 'a', 'willing', 'heart'] >>> lines.flatMap(tokenize).collect() ['One', 'today', 'is', 'worth', 'two', 'tomorrows', 'Better', 'late', 'than', 'never', 'Nothing', 'is', 'impossible', 'for', 'a', 'willing', 'heart'] 3.mapValues() mapValues(func)对键值对组成的RDD对象中的每个value都执行函数func(),返 回由键值对(key,func(value))组成的新RDD,但是,key不会发生变化。键值对RDD是 指RDD中的每个元素都是(key,value)二元组,key为键,value为值。例如: >>> rdd = sc.parallelize(["Hadoop","Spark","Hive","HBase"]) >>> pairRdd = rdd.map(lambda x: (x,1)) #转换为键值对RDD >>> pairRdd.collect() [('Hadoop', 1), ('Spark', 1), ('Hive', 1), ('HBase', 1)] >>> pairRdd.mapValues(lambda x: x+1).foreach(print) #对每个值加1 ('Hadoop', 2) ('Spark', 2) ('Hive', 2) ('HBase', 2) 再给出一个mapValues()应用示例: >>> rdd1 = sc.parallelize(list(range(1,9))) >>> rdd1.collect() [1, 2, 3, 4, 5, 6, 7, 8] >>> result = rdd1.map(lambda x: (x % 4, x)).mapValues(lambda v: v + 10) >>> result.collect() [(1, 11), (2, 12), (3, 13), (0, 14), (1, 15), (2, 16), (3, 17), (0, 18)] 4.flatMapValues() flatMapValues(func)转换操作把键值对RDD中的每个键值对的值都传给一个函数 处理,对于每个值,该函数返回0个或多个输出值,键和每个输出值构成一个二元组,作为 72 Spark大数据分析技术(Python版·微课版) flatMapValues(func)函数返回的新RDD中的一个元素。使用flatMapValues(func)会保 留原RDD的分区情况。 >>> stuRDD = sc.parallelize(['Wang,81|82|83','Li,76|82|80|','Liu,90|88|91']) >>> kvRDD = stuRDD.map(lambda x: x.split(',')) >>> print('kvRDD: ',kvRDD.take(2)) kvRDD: [['Wang', '81|82|83'], ['Li', '76|82|80|']] >>> RDD = kvRDD.flatMapValues(lambda x: x.split('|')).map(lambda x:(x[0],int (x[1]))) >>> print('RDD: ', RDD.take(6)) RDD: [('Wang', 81), ('Wang', 82), ('Wang', 83), ('Li', 76), ('Li', 82), ('Li', 80)] 5.mapPartitions() mapPartitions(func)对每个分区数据执行指定函数。 >>> rdd = sc.parallelize([1, 2, 3, 4],2) >>> rdd.glom().collect() #查看每个分区中的数据 [[1, 2], [3, 4]] >>> def f(x): yield sum(x) >>> rdd.mapPartitions(f).collect() #对每个分区中的数据执行f 函数操作 [3, 7] 3.2.2 去重操作 去重操作包括filter()和distinct()。 1.filter() filter(func)使用过滤函数func过滤RDD 中的元素,func函数的返回值为Boolean 类型,filter(func)执行func函数后返回值为true的元素,组成新的RDD。例如: >>> rdd4=sc.parallelize([1,2,2,3,4,3,5,7,9]) >>> rdd4.filter(lambda x:x>4).collect() #对rdd4 进行过滤,得到大于4 的数据 [5, 7, 9] 创建4名学生考试数据信息的RDD,学生考试数据信息包括姓名、考试科目、考试成绩,各 项之间用空格分隔。下面给出找出成绩为100的学生姓名和考试科目的具体命令语句。 (1)创建学生考试数据信息的RDD: >>> students = sc.parallelize(["XiaoHua Scala 85","LiTao Scala 100","LiMing Python 95","WangFei Java 100"]) 第3章 SparkRDD 编程 73 (2)将students的数据存储为3元组: >>> studentsTup = students.map(lambda x : (x.split(" ")[0], x.split(" ")[1], int(x.split(" ")[2]))) >>> studentsTup.collect() [('XiaoHua', 'Scala', 85), ('LiTao', 'Scala', 100), ('LiMing', 'Python', 95), ('WangFei', 'Java', 100)] (3)过滤出成绩为100的学生的姓名和考试科目: >>> studentsTup.filter(lambda x: x [2]= = 100).map(lambda x:(x [0], x [1])). foreach(print) ('LiTao', 'Scala') ('WangFei', 'Java') 2.distinct() distinct([numPartitions])对RDD中的数据进行去重操作,返回一个新的RDD。其 中,可选参数numPartitions用来设置操作的并行任务个数。例如: >>> Rdd = sc.parallelize([1,2,1,5,3,5,4,8,6,4]) >>> distinctRdd = Rdd.distinct() >>> distinctRdd.collect() [1, 2, 5, 3, 4, 8, 6] 从返回结果[1,2,5,3,4,8,6]中可以看出,数据已经去重。 3.2.3 排序操作 排序操作包括sortByKey()和sortBy()。 1.sortByKey() sortByKey(ascending,[numPartitions])对RDD中的数据集进行排序操作,对键值 对类型的数据按照键进行排序,返回一个排序后的键值对类型的RDD。参数ascending 用来指定是升序还是降序,默认值是True,按升序排序。可选参数numPartitions用来指 定排序分区的并行任务个数。 >>> rdd = sc.parallelize([("WangLi", 1), ("LiHua", 3), ("LiuFei", 2), ("XuFeng", 1)]) >>> rdd.collect() [('WangLi', 1), ('LiHua', 3), ('LiuFei', 2), ('XuFeng', 1)] >>> rdd1 = rdd.sortByKey(False) #False 表示降序 >>> rdd1.collect() [('XuFeng', 1), ('WangLi', 1), ('LiuFei', 2), ('LiHua', 3)] 74 Spark大数据分析技术(Python版·微课版) 2.sortBy() sortBy(keyfunc,[ascending],[numPartitions])使用keyfunc函数先对数据进行处 理,按照处理后的数据排序,默认为升序。sortBy()可以指定按键还是按值进行排序。 第一个参数keyfunc是一个函数,sortBy()按keyfunc对RDD中的每个元素计算的 结果对RDD中的元素进行排序。 第二个参数是ascending,决定排序后RDD 中的元素是升序还是降序。默认是 True,按升序排序。 第三个参数是numPartitions,该参数决定排序后的RDD的分区个数。默认排序后 的分区个数和排序之前相等。 例如,创建4种商品数据信息的RDD,商品数据信息包括名称、单价、数量,各项之间 用空格分隔。命令如下: >>> goods = sc.parallelize(["radio 30 50","soap 3 60","cup 6 50","bowl 4 80"]) (1)按键进行排序,等同于sortByKey()。 首先将goods的数据存储为3元组: >>> goodsTup = goods.map(lambda x: (x.split(" ")[0], int(x.split(" ")[1]), int(x.split(" ")[2]))) 然后按商品名称进行排序: >>> goodsTup.sortBy(lambda x:x[0]).foreach(print) ('bowl', 4, 80) ('cup', 6, 50) ('radio', 30, 50) ('soap', 3, 60) (2)按值进行排序。 按照商品单价降序排序: >>> goodsTup.sortBy(lambda x:x[1], False).foreach(print) ('radio', 30, 50) ('cup', 6, 50) ('bowl', 4, 80) ('soap', 3, 60) 按照商品数量升序排序: >>> goodsTup.sortBy(lambda x:x[2]).foreach(print) ('radio', 30, 50) ('cup', 6, 50) 第3章 SparkRDD 编程 75 ('soap', 3, 60) ('bowl', 4, 80) 按照商品数量与7相除的余数升序排序: >>> goodsTup.sortBy(lambda x:x[2]%7).foreach(print) ('radio', 30, 50) ('cup', 6, 50) ('bowl', 4, 80) ('soap', 3, 60) (3)通过Tuple方式,按照数组的元素进行排序: >>> goodsTup.sortBy(lambda x: (-x[1], -x[2])).foreach(print) ('radio', 30, 50) ('cup', 6, 50) ('bowl', 4, 80) ('soap', 3, 60) 3.2.4 分组聚合操作 分组聚合操作包括groupBy()、groupByKey()、groupWith()、reduceByKey()和 combineByKey()。 1.groupBy() groupBy(func)返回一个按指定条件(用函数func表示)对元素进行分组的RDD。 参数func可以是有名称的函数,也可以是匿名函数,用来指定对所有元素进行分组的键, 或者指定对元素进行求值以确定其所属分组的表达式。注意,groupBy()返回的是一个 可迭代对象,称为迭代器。例如: >>> rdd=sc.parallelize([1,2,3,4,5, 6, 7, 8]) >>> res=rdd.groupBy(lambda x:x%2).collect() >>> for x,y in res: #输出迭代器的具体值 print(x) print(y) print(sorted(y)) print("*"*44) 1< pyspark.resultiterable.ResultIterable object at 0x7fe71012ea60> [1, 3, 5, 7] ******************************************** 0<pyspark.resultiterable.ResultIterable object at 0x7fe70de43bb0> 76 Spark大数据分析技术(Python版·微课版) [2, 4, 6, 8] ******************************************** 2.groupByKey() groupByKey()对一个由键值对(K,V)组成的RDD进行分组聚合操作,返回由键值 对(K,Seq[V])组成的新RDD,Seq[V]表示由键相同的值所组成的序列。 >>> rdd=sc. parallelize([("Spark",1),("Spark",1),("Hadoop",1),("Hadoop",1)]) >>> rdd. groupByKey().map(lambda x : (x[0], list(x[1]))).collect() [('Spark', [1, 1]), ('Hadoop', [1, 1])] >>> rdd. groupByKey().map(lambda x : (x[0], len(list(x[1])))).collect() [('Spark', 2), ('Hadoop', 2)] 3.groupWith() groupWith(otherRDD1,otherRDD2,…)把多个RDD按键进行分组,输出(键,迭代 器)形式的数据。分组后的数据是有顺序的,每个键对应的值是按列出RDD的顺序排序 的。如果RDD没有键,则对应位置取空值。例如: >>> w = sc.parallelize([("a", "w"), ("b", "w")]) >>> x = sc.parallelize([("a", "x"), ("b", "x")]) >>> y = sc.parallelize([("a", "y")]) >>> z = sc.parallelize([("b", "z")]) >>> w.groupWith(x, y, z).collect() [('b', (<pyspark.resultiterable.ResultIterable object at 0x7fe70de3abb0>, <pyspark.resultiterable.ResultIterable object at 0x7fe70ddea2b0>, <pyspark.resultiterable.ResultIterable object at 0x7fe70ddea310>, <pyspark.resultiterable.ResultIterable object at 0x7fe70ddea370>)), ('a', (<pyspark.resultiterable.ResultIterable object at 0x7fe70ddea3d0>, <pyspark.resultiterable.ResultIterable object at 0x7fe70ddea430>, <pyspark.resultiterable.ResultIterable object at 0x7fe70ddea490>, <pyspark.resultiterable.ResultIterable object at 0x7fe70ddea4f0>))] 迭代输出每个分组: >>> [(x, tuple(map(list, y))) for x, y in list(w.groupWith(x, y, z).collect())] [('b', (['w'], ['x'], [], ['z'])), ('a', (['w'], ['x'], ['y'], []))] 4.reduceByKey() reduceByKey(func)对一个由键值对组成的RDD进行聚合操作,对键相同的值,使用 指定的func函数将它们聚合到一起。例如: 第3章 SparkRDD 编程 77 >>> rdd=sc. parallelize([("Spark",1),("Spark",2),("Hadoop",1),("Hadoop",5)]) >>> rdd.reduceByKey(lambda x, y: x+ y).collect() [('Spark', 3), ('Hadoop', 6)] 下面给出一个统计词频的例子: >> > wordsRDD = sc.parallelize(["HewhodoesnotreachtheGreatWallisnotatrueman", " He who has never been to the Great Wall is not a true man"]) #创建RDD >>> FlatRDD = wordsRDD.flatMap(lambda x: x.split(" ")) >>> FlatRDD.collect() ['He', 'who', 'does', 'not', 'reach', 'the', 'Great', 'Wall', 'is', 'not', 'a', 'true', 'man', ' ', 'He', 'who', 'has', 'never', 'been', 'to', 'the', 'Great', ' Wall', 'is', 'not', 'a', 'true', 'man'] >>> KVRdd = FlatRDD.map(lambda x:(x,1)) #创建键值对RDD >>> KVRdd.collect() [('He', 1), ('who', 1), ('does', 1), ('not', 1), ('reach', 1), ('the', 1), (' Great', 1), ('Wall', 1), ('is', 1), ('not', 1), ('a', 1), ('true', 1), ('man', 1), ('', 1), ('He', 1), ('who', 1), ('has', 1), ('never', 1), ('been', 1), ('to', 1), ('the', 1), ('Great', 1), ('Wall', 1), ('is', 1), ('not', 1), ('a', 1), ('true', 1), ('man', 1)] >>> KVRdd.reduceByKey(lambda x, y: x+ y).collect() #统计词频 [('He', 2), ('who', 2), ('does', 1), ('not', 3), ('reach', 1), ('the', 2), (' Great', 2), ('Wall', 2), ('is', 2), ('a', 2), ('true', 2), ('man', 2), (' ', 1), (' has', 1), ('never', 1), ('been', 1), ('to', 1)] 5.combineByKey() combineByKey(createCombiner,mergeValue,mergeCombiners)是对键值对RDD中 的每个键值对按照键进行聚合操作,即合并相同键的值。聚合操作的逻辑是通过自定义 函数提供给combineByKey()方法的,把键值对(K,V)类型的RDD转换为键值对(K,C) 类型的RDD,其中C表示聚合对象类型。 3个参数含义如下: (1)createCombiner是函数。在遍历(K,V)时,若combineByKey()是第一次遇到键 为K的键值对,则对该键值对调用createCombiner函数将V 转换为C,C会作为K 的累 加器的初始值。 (2)mergeValue是函数。在遍历(K,V)时,若comineByKey()不是第一次遇到键为 K的键值对,则对该键值对调用mergeValue函数将V 累加到C中。 (3)mergeCombiners是函数。combineByKey()是在分布式环境中执行的,RDD 的 每个分区单独进行combineBykey()操作,最后需要利用mergeCombiners函数对各个分 区进行最后的聚合。 78 Spark大数据分析技术(Python版·微课版) 下面给出一个例子。 (1)定义createCombiner函数: >>> def createCombiner(value): return(value,1) (2)定义mergeValue函数: >>> def mergeValue(acc, value): return(acc[0]+value, acc[1]+1) (3)定义mergeCombiners函数: >>> def mergeCombiners(acc1, acc2): return(acc1[0]+acc2[0], acc1[1]+acc2[1]) (4)创建考试成绩RDD对象: >>> Rdd = sc.parallelize([('ID1', 80),('ID2', 85),('ID1', 90),('ID2', 95), ('ID3', 99)], 2) > > > combineByKeyRdd = Rdd. combineByKey ( createCombiner, mergeValue, mergeCombiners) >>> combineByKeyRdd.collect() [('ID1', (170, 2)), ('ID2', (180, 2)), ('ID3', (99, 1))] (5)求平均成绩: >>> avgRdd = combineByKeyRdd.map(lambda x:(x[0],float(x[1][0])/x[1][1])) >>> avgRdd.collect() [('ID1', 85.0), ('ID2', 90.0), ('ID3', 99.0)] 3.2.5 集合操作 集合操作包括union()、intersection()、subtract()和cartesian()。 1.union() union(otherRDD)对源RDD和参数otherRDD指定的RDD求并集后返回一个新的 RDD,不进行去重操作。例如: >>> rdd1 = sc.parallelize(list(range(1,5))) >>> rdd2 = sc.parallelize(list(range(3,7))) >>> rdd1.union(rdd2).collect() [1, 2, 3, 4, 3, 4, 5, 6] 第3章 SparkRDD 编程 79 2.intersection() intersection(otherRDD)对源RDD和参数otherRDD 指定的RDD 求交集后返回一 个新的RDD,且进行去重操作。例如: >>> rdd1.intersection(rdd2).collect() [4, 3] 3.subtract() subtract(otherRDD)相当于进行集合的差集操作,即从源RDD 中去除与参数 otherRDD指定的RDD中相同的元素。例如: >>> rdd1.subtract(rdd2).collect() [2, 1] 4.cartesian() cartesian(otherRDD)对源RDD和参数otherRDD 指定的RDD 进行笛卡儿积操作。 例如: >>> rdd1.cartesian(rdd2).collect() [(1, 3), (1, 4), (1, 5), (1, 6), (2, 3), (2, 4), (2, 5), (2, 6), (3, 3), (3, 4), (3, 5), (3, 6), (4, 3), (4, 4), (4, 5), (4, 6)] 3.2.6 抽样操作 抽样操作包括sample()和sampleByKey()。 1.sample() sample(withReplacement,fraction,seed)操作以指定的抽样种子seed从RDD的数 据中抽取比例为fraction的数据。withReplacement表示抽出的数据是否放回,True为 有放回的抽样,False为无放回的抽样。相同的seed得到的随机序列一样。 >>> SampleRDD=sc.parallelize(list(range(1,1000))) >>> SampleRDD.sample(False,0.01,1).collect() #输出取样 [14, 100, 320, 655, 777, 847, 858, 884, 895, 935] 2.sampleByKey() sampleByKey(withReplacement,fractions,seed)按键的比例抽样,withReplacement表 示是否有放回,fractions表示抽样比例,seed表示抽样种子。例如: 80 Spark大数据分析技术(Python版·微课版) >>> fractions = {"a":0.5, "b":0.1} > > > rdd = sc. parallelize (fractions. keys ( ), 3). cartesian ( sc. parallelize (range(0,10),2)) >>> sample = dict(rdd.sampleByKey(False,fractions,2).groupByKey(3).collect()) >>> [(iter[0],list(iter[1])) for iter in sample.items()] [('b', [5, 9]), ('a', [1, 4, 5, 7])] 3.2.7 连接操作 连接操作包括join()、leftOuterJoin()、rightOuterJoin()和fullOuterJoin()。 1.join() join(otherRDD,[numPartitions])对两个键值对RDD进行内连接,将两个RDD 中 键相同的(K,V)和(K,W)进行连接,返回键值对(K,(V,W))。其中,V 表示源RDD 的值,W 表示参数otherRDD指定的RDD的值。例如: >>> pairRDD1 = sc.parallelize([("Scala",2), ("Scala", 3), ("Java", 4), ("Python", 8)]) >>> pairRDD2 = sc.parallelize([ ("Scala",3), ("Java", 5), ("HBase", 4), ( "Java", 10)]) >>> pairRDD3 = pairRDD1.join(pairRDD2) >>> pairRDD3.collect() [('Java', (4, 5)), ('Java', (4, 10)), ('Scala', (2, 3)), ('Scala', (3, 3))] 2.leftOuterJoin() leftOuterJoin()可用来对两个键值对RDD 进行左外连接操作,保留第一个RDD 的 所有键。在左外连接中,如果第二个RDD中有对应的键,则连接结果中显示为Some类 型,表示有值可以引用;如果没有,则为None值。例如: >>> left_Join = pairRDD1.leftOuterJoin(pairRDD2) >>> left_Join.collect() [('Java', (4, 5)), ('Java', (4, 10)), ('Python', (8, None)), ('Scala', (2, 3)), ('Scala', (3, 3))] 3.rightOuterJoin() rightOuterJoin()可用来对两个键值对RDD进行右外连接操作,确保第二个RDD的 键必须存在,即保留第二个RDD的所有键。 4.fullOuterJoin() fullOuterJoin()是全外连接操作,会保留两个RDD中所有键的连接结果。例如: 第3章 SparkRDD 编程 81 >>> full_Join = pairRDD1.fullOuterJoin (pairRDD2) >>> full_Join.collect() [('Java', (4, 5)), ('Java', (4, 10)), ('Python', (8, None)), ('Scala', (2, 3)), ('Scala', (3, 3)), ('HBase', (None, 4))] 3.2.8 打包操作 zip(otherRDD)将两个RDD打包成键值对形式的RDD,要求两个RDD 的分区数量 以及每个分区中元素的数量都相同。例如: >>> rdd1=sc.parallelize([1, 2, 3], 3) >>> rdd2=sc.parallelize(["a","b","c"], 3) >>> zipRDD=rdd1.zip(rdd2) >>> zipRDD.collect() [(1, 'a'), (2, 'b'), (3, 'c')] 3.2.9 获取键值对RDD 的键和值集合 对一个键值对RDD,调用keys()返回一个仅包含键的RDD,调用values()返回一个 仅包含值的RDD。 >>> zipRDD.keys().collect() [1, 2, 3] >>> zipRDD.values().collect() ['a', 'b', 'c'] 3.2.10 重新分区操作 重新分区操作包括coalesce()和repartition()。 1.coalesce() 在分布式集群里,网络通信的代价很大,减少网络传输可以极大地提升性能。 MapReduce框架的性能开销主要在I/O 和网络传输两方面。I/O 因为要大量读写文件, 性能开销是不可避免的;但可以通过优化方法降低网络传输的性能开销,例如把大文件压 缩为小文件可减少网络传输的开销。 I/O 在Spark中也是不可避免的,但Spark对网络传输进行了优化。Spark对RDD 进行分区(分片),把这些分区放在集群的多个计算节点上并行处理。例如,把RDD分成 100个分区,平均分布到10个节点上,一个节点上有10个分区。当进行求和型计算的时 候,先进行每个分区的求和,然后把分区求和得到的结果传输到主程序进行全局求和,这 样就可以降低求和计算时网络传输的开销。 coalesce(numPartitions,shuffle)的作用是:默认使用HashPartitioner(哈希分区方 82 Spark大数据分析技术(Python版·微课版) 式)对RDD进行重新分区,返回一个新的RDD,且该RDD的分区个数等于numPartitions。 参数说明如下: (1)numPartitions:要生成的新RDD的分区个数。 (2)shuffle:指定是否进行洗牌。默认为False,重设的分区个数只能比RDD原有分 区数小;如果shuffle为True,重设的分区个数不受原有RDD分区个数的限制。 下面给出一个例子: >>> rdd =sc.parallelize(range(1,17), 4) #创建RDD,分区个数为4 >>> rdd.getNumPartitions() #查看RDD 分区个数 4> >> coalRDD=rdd.coalesce(5) #重新分区,分区个数为5 >>> coalRDD.getNumPartitions() 4> >> coalRDD1 =rdd.coalesce(5, True) #重新分区,shuffle 为True >>> coalRDD1.getNumPartitions() #查看coalRDD1 分区个数 5 2.repartition() repartition(numPartitions)其实就是coalesce()方法的第二个参数shuffle为True 的简单实现。例如: >>> coalRDD2 = coalRDD1.repartition(2) #转换成两个分区的RDD >>> coalRDD2.getNumPartitions() #查看coalRDD2 分区个数 2 Spark支持自定义分区方式,即通过一个自定义的分区函数对RDD进行分区。需要 注意,Spark的分区函数针对的是键值对类型的RDD,分区函数根据键对RDD的元素进 行分区。因此,当需要对一些非键值对类型的RDD进行自定义分区时,需要先把该RDD 转换成键值对类型的RDD,然后再使用分区函数。 下面给出一个自定义分区的实例,要求根据键的最后一位数字将键值对写入不同的 分区中。打开一个Linux终端,使用gedit编辑器创建一个代码文件,将其命名为/usr/ local/spark/myproject/rdd/partitionerTest.py,输入以下代码: from pyspark import SparkConf,SparkContext def SelfPartitioner(key): #自定义分区函数 print('Self Defined Partitioner is running') print('The key is %d'%key) return key%5 #设定分区方式 def main(): print('The main function is running')