学习目标: .了解SparkSQL 的简介,能够说出SparkSQL 的特点。 .熟悉SparkSQL 架构,能够说明Catalyst内部组件的运行流程。 .熟悉DataFrame 的基本概念,能够说明DataFrame 与RDD 在结构上的区别。 .掌握DataFrame 的创建,能够通过读取文件创建DataFrame 。 .掌握DataFrame 的常用操作,能够使用DSL 风格和SQL 风格操作DataFrame 。 .掌握DataFrame 的函数操作,能够使用标量函数和聚合函数操作DataFrame 。 .掌握RDD 与DataFrame 的转换,能够通过反射机制和编程方式将RDD 转换成 DataFrame 。 .掌握SparkSQL 操作数据源,能够使用SparkSQL 操作MySQL 和Hive。 针对那些不熟悉Spark常用API,但希望利用Spark强大数据分析能力的用户,Spark 提供了一种结构化数据处理模块SparkSQL,SparkSQL 模块使用户可以利用SQL 语句处 理结构化数据。本章针对SparkSQL 的基本原理和使用方式进行详细讲解。 3.akSQL基础知识 1 Spr SparkSQL 是Spark用来处理结构化数据的一个模块,它提供了一个名为DataFrame 的数据模型,即带有元数据信息的RDD 。基于Python语言使用SparkSQL 时,用户可以通 过SQL 和DataFrameAPI 两种方式实现对结构化数据的处理。无论用户选择哪种方式, 它们都是基于同样的执行引擎,可以方便地在不同的方式之间进行切换。接下来,本节对 SparkSQL 的基础知识进行介绍。 3.1.akSQL 简介 1 Spr SparkSQL 的前身是Shark,Shark最初是由加州大学伯克利分校的实验室开发的 Spark生态系统的组件之一,它运行在Spark系统上,并且重新利用了Hive的工作机制,并 继承了Hive的各个组件。Shark主要的改变是将SQL 语句的转换方式从MapReduce作 业替换成了Spark作业,从而提高了计算效率。 然而,由于Shark过于依赖Hive,所以在版本迭代时很难添加新的优化策略,从而限制 了Spark的发展,因此,在后续的迭代更新中,Shark的维护停止了,转向SparkSQL 的 52 Spark大数据分析与应用(Python版) 开发 。 SparkSQL 具有3个特点,具体内容如下 。 (1)支持多种数据源。SparkSQL 可以从各种数据源中读取数据,包括JSON 、Hive、 MySQL 等,使用户可以轻松处理不同数据源的数据。 (2)支持标准连接。SparkSQL 提供了行业标准的JDBC 和ODBC 连接方式,使用户 可以通过外部连接方式执行SQL 查询,不再局限于在Spark程序内使用SQL 语句进行 查询。 (3)支持无缝集成。SparkSQL 提供了Python、Scala和Java等编程语言的API,使 SparkSQL 能够与Spark程序无缝集成,可以将结构化数据作为Spark中的RDD 进行查 询。这种紧密的集成方式使用户可以方便地在Spark框架中进行结构化数据的查询与 分析。 总体来说,SparkSQL 支持多种数据源的查询和加载,并且兼容Hive,可以使用JDBC 和ODBC 的连接方式来执行SQL 语句,它为Spark框架在结构化数据分析方面提供重要 的技术支持。 3..akSQL 架构 12 Spr SparkSQL 的一个重要特点就是能够统一处理数据表和RDD,使用户可以轻松地使用 SQL 语句进行外部查询,同时进行更加复杂的数据分析。接下来,通过图3-1来了解Spark SQL 底层架构。 图3- 1 SparkSQL底层架构 从图3-1可以看出,用户提交SQL 语句、DataFrame 和Dataset后,会经过一个优化器 (Catalyst), 将SQL 语句、DataFrame 和Dataset的执行逻辑转换为RDD,然后提交给Spark 集群(Cluster)处理。SparkSQL 的计算效率主要由Catalyst决定,也就是说SparkSQL 执 行逻辑的生成和优化工作全部交给SparkSQL 的Catalyst管理。 Catalyst是一个可扩展的查询优化框架,它基于Scala函数式编程,使用SparkSQL 时,Catalyst能够为后续的版本迭代更新轻松地添加新的优化技术和功能,特别是针对大数 据生产环境中遇到的问题,如针对半结构化数据和高级数据分析。另外,Spark作为开源项 目,用户可以针对项目需求自行扩展Catalyst的功能。 Catalyst内部主要包括Parser(分析)组件、Analyzer(解析)组件、Optimizer(优化)组 件、Planner(计划)组件和QueryExecution(执行)组件。接下来,通过图3-2来介绍Catalyst内 部各组件的关系。 图3-2展示的是Catalyst的内部组件的关系,这些组件的运行流程如下。 (1)当用户提交SQL 语句、DataFrame 或Dataset时,它们会经过Parser组件进行分 析。Parser组件分析相关的执行语句,判断其是否符合规范,一旦分析完成,会创建 第3章SparkSQL结构化数据处理模块53 图3- 2 Catalyst内部各组件的关系 SparkSesion,并将包括表名、列名和数据类型等元数据保存在会话目录(SesionCatalog) 中发送给Analyzer组件,此时的执行语句为未解析的逻辑计划(UnresolvedLogicalPlan)。 其中会话目录用于管理与元数据相关的信息。 (2)Analyzer组件根据会话目录中的信息,将未解析的逻辑计划解析为逻辑计划 (LogicalPlan)。同时,Analyzer组件还会验证执行语句中的表名、列名和数据类型是否存 在于元数据中。如果所有的验证都通过,那么逻辑计划将被保存在缓存管理器(Cache Manager)中,并发送给Optimizer组件。 (3)Optimizer组件接收到逻辑计划后进行优化处理,得到优化后的逻辑计划 (OptimizedLogicalPlan)。例如,在计算表达式x+(1+2)时,Optimizer组件会将其优化 为x+3,如果没有经过优化,每个结果都需要执行一次1+2的操作,然后再与x相加,通过 优化,就无须重复执行1+2的操作。优化后的逻辑计划会发送给Planner组件。 (4)Planner组件将优化后的逻辑计划转换为多个物理计划(PhysicalPlan),通过成本 模型(CostModel)进行资源消耗估算,在多个物理计划中得到选择后的物理计划(Selected PhysicalPlan)并将其发送给QueryExecution组件。 (5)QueryExecution组件根据选择后的物理计划生成具体的执行逻辑,并将其转化 为RDD 。 3.aare基础知识 2 DtFam 3..aarme简介 21 DtFa 在Spark中,DataFrame是一种以RDD为基础的分布式数据集,因此DataFrame可以 执行绝大多数RDD的功能。在实际开发中,可以方便地进行RDD和DataFrame之间的 转换。 Spark大数据分析与应用(Python54 版) DataFrame的结构类似于传统数据库的二维表格,并且可以由多种数据源创建,如结构化 文件、外部数据库、Hive表等。下面,通过图3-3来了解DataFrame与RDD在结构上的区别。 图3-3 DataFrame与RDD 的区别 在图3-3中,左侧为RDD[Person]数据集,右侧为DataFrame数据集。DataFrame可 以看作分布式Row 对象的集合,每个Row 表示一行数据。与RDD 不同的是,DataFrame 还包含了元数据信息,即每列的名称和数据类型,如Name、Age和Height为列名,String、 Int和Double为数据类型。这使得SparkSQL可以获取更多的数据结构信息,并对数据源 和DataFrame上的操作进行精细化的优化,最终提高计算效率,同时,DataFrame与Hive 类似,DataFrame也支持嵌套数据类型,如Struct、Array和Map。 RDD[Person]虽然以Person为类型参数,但是SparkSQL无法获取RDD[Person]内 部的结构,导致在转换数据形式时效率相对较低。 总的来说,DataFrame提高了SparkSQL的执行效率、减少数据读取时间以及优化执 行计划。引入DataFrame后,处理数据就更加简单了,可以直接用SQL或DataFrameAPI 处理数据,极大提升了用户的易用性。通过DataFrameAPI或SQL处理数据时,Catalyst 会自动优化查询计划,即使用户编写的程序或SQL语句在逻辑上不是最优的,Spark仍能 够高效地执行这些查询。 3.2.2 DataFrame的创建 在Spark2.0之后,Spark引入了SparkSession接口更方便地创建DataFrame。创建 DataFrame时需要创建SparkSession对象,该对象的创建分为两种方式,一种是通过代码 SparkSession.builder.master().appName().getOrCreate()来创建。另一种是PySpark中 会默认创建一个名为spark的SparkSession对象。一旦SparkSession对象创建完成后,就 可以通过其提供的read属性获取一个DataFrameReader对象,并利用该对象调用一系列方 法从各种类型的文件中读取数据创建DataFrame。 接下来,基于YARN集群的运行模式启动PySpark。在虚拟机Hadoop1的目录/export/ servers/sparkOnYarn/spark-3.3.0-bin-hadoop3中执行如下命令。 $ bin/pyspark --master yarn 上述命令执行完成后的效果如图3-4所示。 从图3-4可以看出,PySpark默认创建了一个名为spark的SparkSession对象。 第3章 SparkSQL结构化数据处理模块 55 图3-4 启动PySpark 常见的读取数据创建DataFrame的方法如表3-1所示。 表3-1 常见的读取数据创建DataFrame的方法 方 法语法格式说 明 text() SparkSession.read.text(path) 从指定目录path读取文本文件,创建DataFrame csv() SparkSession.read.csv(path) 从指定目录path读取CSV文件,创建DataFrame json() SparkSession.read.json(path) 从指定目录path读取JSON文件,创建DataFrame parquet() SparkSession.read.parquet(path) 从指定目录path读取parquet文件,创建DataFrame toDF() RDD.toDF([col,col,…]) 用于将一个RDD转换为DataFrame,并且可以指定 列名col。默认情况下,列名的格式为_1、_2等 createDataFrame()SparkSession.createDataFrame (data,schema) 通过读取自定义数据data创建DataFrame 在表3-1中,参数data用于指定DataFrame的数据,其值的类型可以是数组、List集合 或者RDD。参数schema为可选用于指定DataFrame的元信息,包括列名、数据类型等。 如果没有指定参数schema,那么使用默认的列名,其格式为_1、_2等。而数据类型则通过 数据自行推断。 接下来,通过读取JSON 文件演示如何创建DataFrame,具体步骤如下。 1.数据准备 克隆一个虚拟机Hadoop1的会话,在虚拟机Hadoop1的/export/data目录下执行vi person.json命令创建JSON 文件person.json,具体内容如文件3-1所示。 文件3-1 person.json {"age":20, "id":1, "name":"zhangsan"} {"age":18, "id":2, "name":"lisi"} {"age":21, "id":3, "name":"wangwu"} {"age":23, "id":4, "name":"zhaoliu"} {"age":25, "id":5, "name":"tianqi"} {"age":19, "id":6, "name":"xiaoba"} Spark大数据分析与应用(Python56 版) 数据文件创建完成后,在/export/data目录执行hdfsdfs-putperson.json/命令将 JSON 文件person.json上传到HDFS的根目录。 2.读取文件创建DataFrame 通过读取JSON 文件person.json创建DataFrame,具体代码如下。 >>> personDF = spark.read.json("/person.json") 上述代码中,使用json()方法读取HDFS根目录的JSON 文件person.json创建名为 personDF的DataFrame。 DataFrame创建完成后,可以通过printSchema()方法输出personDF的元数据信息, 具体代码如下。 >>> personDF.printSchema() 上述代码运行完成后的效果如图3-5所示。 图3-5 输出personDF的元数据信息 从图3-5可以看出,JSON 文件person.json中的键会作为DataFrame的列名,而列的 数据类型会根据键对应的值自行推断。此外,默认情况下,DataFrame中每个列的值可以为 空,即nullable=true。例如,根据JSON 文件person.json中键name的值推断出列name 的数据类型为string。 使用show()方法查看当前DataFrame的内容,具体代码如下。 >>> personDF.show() 上述代码运行完成后的效果如图3-6所示。 从图3-6 可以看出,DataFrame 的内容为二维表格的形式,其中列与JSON 文件 person.json中的键有关,而列的数据为JSON 文件person.json中键对应的值。 3.2.3 DataFrame的常用操作 多样性是人类社会发展的基石,是文明进步的源泉。在一个多样化的社会中,每个人都 能找到属于自己的位置,贡献独特的智慧和力量。尊重和包容多样性,不仅是对每个人基本 权利的尊重,也是实现社会公平正义的必要条件。DataFrame提供了两种语法风格,分别是 第3章 SparkSQL结构化数据处理模块 57 图3-6 查看DataFrame的内容 DSL(DynamicScriptLanguage,领域特定语言)风格和SQL 风格,前者通过DataFrame API的方式操作DataFrame,后者通过SQL 语句的方式操作DataFrame。接下来,针对 DSL风格和SQL风格分别讲解DataFrame的具体操作方式。 1.DSL风格 DataFrame提供了一种DSL 风格去管理结构化数据的方式,可以在Scala、Java、 Python和R语言中使用DSL。下面,以Python语言使用DSL为例,讲解DataFrame的常 用操作。 (1)printSchema()方法:查看DataFrame的元数据信息。 (2)show()方法:查看DataFrame中的内容。 (3)select()方法:选择DataFrame中指定列。 下面基于3.2.2节创建的DataFrame选择并查看其name列的数据,具体代码如下。 >>> personDF.select(personDF["name"]).show() 上述代码运行完成后的效果如图3-7所示。 图3-7 select()方法的使用 Spark大数据分析与应用(Python58 版) 从图3-7可以看出,DataFrame中name列的数据包括zhangsan、lisi、wangwu、zhaoliu、 tianqi、xiaoba。 (4)filter()方法:实现条件查询,筛选出想要的结果。 下面演示如何筛选DataFrame中age列大于或等于20的数据,具体代码如下。 >>> personDF.filter(personDF["age"] >= 20).show() 上述代码运行完成后的效果如图3-8所示。 图3-8 filter()方法的使用 从图3-8可以看出,DataFrame中包含4条age列大于或等于20的数据。 (5)groupBy()方法:根据DataFrame的指定列进行分组,分组完成后可通过count() 方法对每个组内的元素进行计数操作。 下面演示如何将DataFrame中age列进行分组并统计每个组内元素的个数,具体代码 如下。 >>> personDF.groupBy("age").count().show() 上述代码运行完成后的效果如图3-9所示。 图3-9 groupBy()方法的使用 第3章 SparkSQL结构化数据处理模块 59 从图3-9可以看出,根据DataFrame中age列分为6组,每组内元素的个数都为1。 (6)sort()方法:根据指定列进行排序操作,默认是升序排序,若指定为降序排序,需要 使用desc()方法指定排序规则为降序排序。 下面演示如何将DataFrame中的age列进行降序排序,具体代码如下。 >>> personDF.sort(personDF["age"].desc()).show() 上述代码运行完成后的效果如图3-10所示。 图3-10 sort()方法的使用 从图3-10可以看出,DataFrame中的数据根据age列进行降序排序。 2.SQL风格 DataFrame的强大之处就是可以将它看作一个关系型数据表,然后可以在Spark中直 接使用spark.sql()的方式执行SQL查询,结果将作为一个DataFrame返回。使用SQL风 格操作的前提是需要使用createOrReplaceTempView()方法将DataFrame创建成一个临 时视图。接下来,创建一个DataFrame的临时视图t_person,具体代码如下。 >>> personDF.createOrReplaceTempView("t_person") 上述代码中,通过createOrReplaceTempView()方法创建personDF 的临时视图t_ person。使用createOrReplaceTempView()方法创建的临时视图的生命周期依赖于 SparkSession,即SparkSession存在则临时视图存在。当用户想要手动删除临时视图时,可 以通过执行spark.catalog.dropTempView("t_person")代码实现,其中t_person用于指定 临时视图的名称。 下面,演示使用SQL风格方式操作DataFrame。 (1)查询临时视图t_person中age列的值最大的两行数据,具体代码如下。 >>> spark.sql("select * from t_person order by age desc limit 2").show() 上述代码中,通过SQL语句对临时视图t_person中age列进行降序排序,并获取排序 Spark大数据分析与应用(Python60 版) 结果的前两条数据。 上述代码运行完成后的效果如图3-11所示。 从图3-11可以看出,成功筛选出临时视图t_person中age列的值最大的两行数据。 (2)查询临时视图t_person中age列的值大于20的数据,具体代码如下。 >>> spark.sql("select * from t_person where age>20").show() 上述代码运行完成后的效果如图3-12所示。 图3-11 临时视图t_person中age列的 值最大的两行数据 图3-12 临时视图t_person中age列 的值大于20的数据 从图3-12可以看出,成功筛选出age列的值大于20的数据。 DataFrame操作方式简单,并且功能强大,熟悉SQL 语法的用户都能够快速地掌握 DataFrame的操作,本节只讲解了部分常用的操作方式,读者可通过查阅Spark官网详细学 习DataFrame的操作方式。 3.2.4 DataFrame的函数操作 SparkSQL提供了一系列函数对DataFrame进行操作,能够实现对数据进行多样化的 处理和分析,这些函数操作主要包括标量函数(ScalarFunctions)操作和聚合函数 (AggregateFunctions)操作,它们同样支持DSL风格和SQL风格操作DataFrame,鉴于使 用SQL风格操作DataFrame较为简单,本节使用DSL 风格重点介绍这两种类型函数的 操作。 1.标量函数操作 标量函数操作是对于输入的每一行数据,函数会产生单个值作为输出。标量函数分为 内置标量函数(Built-inScalarFunctions)操作和自定义标量函数(User-DefinedScalar Functions)操作,关于内置标量函数操作和自定义标量函数操作的介绍如下。 (1)内置标量函数。 SparkSQL提供了大量的内置标量函数供用户直接使用。下面介绍SparkSQL常用 的内置标量函数,如表3-2所示。 第3章 SparkSQL结构化数据处理模块 61 表3-2 SparkSQL常用的内置标量函数 函 数语法格式说 明 array_max array_max(col) 用于对DataFrame中数组类型的列col进行操作,获取 每个数组的最大值 array_min array_min(col) 用于对DataFrame中数组类型的列col进行操作,获取 每个数组的最小值 map_keys map_keys(col) 用于对DataFrame中键值对类型的列col进行操作,获 取每个键值对的键 map_values map_values(col) 用于对DataFrame中键值对类型的列col进行操作,获 取每个键值对的值 element_at element_at(col,key) 用于对DataFrame中键值对类型的列col进行操作,根 据指定的键key 返回对应的值,如果键不存在,则返 回null date_add date_add(startDate,num_days) 用于在指定日期startDate 上增加天数num_days。 startDate可以是日期类型的列也可以是字符串类型的 日期 datediff datediff(endDate,startDate) 用于计算两个日期startDate和endDate,之间的天数差 异。startDate和endDate可以是日期类型的列也可以是 字符串类型的日期 substring substring(str,pos,len) 用于对DataFrame中字符串类型的列进行操作,从字符 串中截取部分字符串。其中参数str为初始字符串或列; 参数pos为提取部分字符串的索引位置,从1开始;参数 len指定截取部分字符串的长度。如初始字符串为 world,索引位置为1,截取长度为2,则截取后的字符串 为wo。如果索引位置超过初始字符串的长度,则截取后 的字符串为空,如果截取长度超过索引位置之后字符串 的长度,则将索引位置之后字符串全部截取 在表3-2中,内置标量函数对数组和键值对类型数据的操作在Python中指代的是列表 和字典。以PyCharm 为例,演示表3-2中常用内置标量函数的使用,具体内容如下。 ①array_max函数。在项目Python_Test中创建Function文件夹,并在该文件夹中创 建名为FunTest的Python文件,通过array_max函数获取DataFrame中数组类型列的最 大值,具体代码如文件3-2所示。 文件3-2 FunTest.py 1 from pyspark.sql import SparkSession 2 from pyspark.sql.functions import * 3 spark = SparkSession.builder.master("local[*]") \ 4 .appName("FunTest") \ 5 .getOrCreate() 6 data = spark.createDataFrame( 7 [( 8 [80, 88, 68], 9 {"xiaohong":"B","xiaoming":"A","xiaoliang":"C"}, 10 "2023-10-15", Spark大数据分析与应用(Python62 版) 11 "2023-10-16" 12 )], 13 ["数学分数","学生评级","考试时间","成绩公布时间"] 14 ) 15 result = data.select("数学分数",array_max("数学分数")) 16 # 参数truncate = false 用于指定显示DataFrame 中完整的行内容 17 result.show(truncate=False) 18 # 释放资源 19 spark.stop() 在文件3-2中,第6~14行代码通过createDataFrame()方法创建一个名为data的 DataFrame。createDataFrame()方法的第一个参数通过列表指定data中的数据,列表的每 个元素将作为data的每行数据。当列表中元素的类型为元组时,元组中的每个元素将依次 作为data中每个列的数据;第二个元素通过列表指定data中每个列的列名,列的数据类型 将通过其存储的数据自行推断。 第15行代码通过select()方法选择data中的“数学分数”列,并通过array_max函数获 取“数学分数”列的最大值。 文件3-2的运行结果如图3-13所示。 图3-13 文件3-2的运行结果(1) 从图3-13可以看出,“数学分数”列的最大值为88。 ②array_min函数。通过array_min函数获取DataFrame中数组类型列的最小值。这 里将文件3-2中第15行代码修改为如下代码。 result = data.select("数学分数",array_min("数学分数")) 上述代码通过select()方法选择data中的“数学分数”列,并通过array_min函数获取 “数学分数”列的最小值。 文件3-2的运行结果如图3-14所示。 图3-14 文件3-2的运行结果(2) 第3章 SparkSQL结构化数据处理模块 63 从图3-14可以看出,“数学分数”列的最小值为68。 ③ map_keys函数。通过map_keys函数获取DataFrame中键值对类型列的键。这里 将文件3-2中第15行代码修改为如下代码。 result = data.select("学生评级",map_keys("学生评级")) 上述代码通过select()方法选择data中的“学生评级”列,并通过map_keys函数获取 “学生评级”列中的键。 文件3-2的运行结果如图3-15所示。 图3-15 文件3-2的运行结果(3) 从图3-15可以看出,“数学评级”列中的键包括xiaohong、xiaoming和xiaoliang。 ④ map_values函数。通过map_values函数获取DataFrame中键值对类型列的值。 这里将文件3-2中第15行代码修改为如下代码。 result = data.select("学生评级",map_values("学生评级")) 上述代码通过select()方法选择data中的“学生评级”列,并通过map_values函数获取 “学生评级”列中的值。 文件3-2的运行结果如图3-16所示。 图3-16 文件3-2的运行结果(4) 从图3-16可以看出,“数学评级”列中的值包括B、A 和C。 ⑤element_at函数。通过element_at函数获取DataFrame中键值对类型列指定键对 应的值。这里将文件3-2中第15行代码修改为如下代码。 result = data.select("学生评级",element_at("学生评级","xiaoming")) 上述代码通过select()方法选择data中的“学生评级”列,并通过element_at函数获取 Spark大数据分析与应用(Python64 版) “学生评级”列中键为xiaoming的值。 文件3-2的运行结果如图3-17所示。 图3-17 文件3-2的运行结果(5) 从图3-17可以看出,“学生评级”列中键为xiaoming的值为A。 ⑥ date_add函数。通过date_add函数实现对DataFrame中字符串日期类型列增加指 定的天数。这里将文件3-2中第15行代码修改为如下代码。 result = data.select("考试时间",date_add("考试时间",3)) 上述代码通过select()方法选择data中的“考试时间”列,并通过date_add函数将“考 试时间”列中的日期增加3天。 文件3-2的运行结果如图3-18所示。 图3-18 文件3-2的运行结果(6) 从图3-18可以看出,“考试时间”列中的日期增加3天的结果为2023-10-18。 ⑦ datediff函数。通过datediff函数实现计算DataFrame中字符串日期类型列的时间 间隔。这里将文件3-2中第15行代码修改为如下代码。 result = data.select("考试时间","成绩公布时间",datediff("成绩公布时间","考试时 间")) 上述代码通过select()方法选择data中的“考试时间”和“成绩公布时间”列,并通过 datediff函数计算“考试时间”和“成绩公布时间”列的时间间隔。 文件3-2的运行结果如图3-19所示。 从图3-19可以看出,“考试时间”和“成绩公布时间”列的时间间隔为1。 ⑧substring函数。通过substring函数实现对DataFrame中字符串类型的列截取部 分字符串。这里将文件3-2中第15行代码修改为如下代码。 第3章 SparkSQL结构化数据处理模块 65 图3-19 文件3-2的运行结果(7) result = data.select("考试时间",substring("考试时间",0,4)) 上述代码通过select()方法选择data中的“考试时间”列,并通过substring函数从“考 试时间”列中截取索引位置为0,截取长度为4的字符串。 文件3-2的运行结果如图3-20所示。 图3-20 文件3-2的运行结果(8) 从图3-20可以看出,“考试时间”列中截取索引位置为0,截取长度为4 的字符串 为2023。 (2)自定义标量函数。 自定义标量函数是指内置标量函数不足以处理指定需求时,用户可以自行定义的函数, 它可以在程序中添加自定义的功能实现对DataFrame进行操作。 在SparkSQL中实现自定义标量函数分为定义函数和注册函数两部分操作,其中定义 函数用于指定处理逻辑;注册函数用于将定义的函数注册到SparkSession中,使其成为 SparkSQL中的标量函数,定义函数的语法格式如下。 def fun_name([参数列表]): 函数体 [return value] 上述语法格式的解释如下。 ① def:定义函数的关键字。 ②fun_name:用于指定函数的名称。 ③ [参数列表]:负责接收传入函数中的参数,可以包含一个或多个参数,也可以为空。 ④ 函数体:指定函数的处理逻辑。 ⑤ [returnvalue]:指定函数的返回值value。如果函数没有返回值,可以省略。 注册函数针对使用DSL风格和SQL风格操作DataFrame具有不同的语法格式,具体 Spark大数据分析与应用(Python66 版) 如下。 # 使用DSL 风格操作DataFrame 时的注册函数 udf_fun = udf(fun_name, returnType) # 使用SQL 风格操作DataFrame 时的注册函数 spark.udf.register(name, fun_name, returnType) 上述语法格式中,使用DSL风格操作DataFrame时的注册函数中,udf()方法用于将定 义的函数注册为自定义标量函数,该函数接收两个参数,fun_name参数为定义的函数名, returnType参数为自定义标量函数返回值的数据类型。 使用SQL风格操作DataFrame时的注册函数中,通过调用SparkSession对象的udf() 方法获取一个UDFRegistration对象,该对象的register()方法用于将定义的函数注册为自 定义标量函数,该方法接收3个参数,name参数为自定义标量函数的函数名,fun_name参 数为定义的函数名,returnType参数为自定义标量函数返回值的数据类型。 接下来,以PyCharm 为例,演示自定义标量函数的使用。在项目Python_Test的 Function文件夹中创建名为UDFTest的Python文件,实现将DataFrame中每个单词的 第3个字母变为大写,具体代码如文件3-3所示。 文件3-3 UDFTest.py 1 from pyspark.sql import SparkSession 2 from pyspark.sql.functions import udf 3 from pyspark.sql.types import StringType 4 spark = SparkSession.builder.master("local[*]") \ 5 .appName("UDFTest") \ 6 .getOrCreate() 7 data = [ 8 (1, "hello"), 9 (2, "world"), 10 (3, "spark") 11 ] 12 schema = ["id", "value"] 13 df = spark.createDataFrame(data, schema) 14 def Up(word): 15 if len(word) >= 3: 16 return word[:2] + word[2].upper() + word[3:] 17 else: 18 return word 19 udf_up = udf(Up, StringType()) 20 result = df.select("id", udf_up("value")) 21 result.show() 22 spark.stop() 在文件3-3中,第14~18行代码通过定义一个名为Up的函数用于将DataFrame中每 个单词的第3个字母变为大写,接收参数word表示DataFrame中的每个单词。实现逻辑 为通过if-else语句判断的单词长度是否大于或等于3,如果满足条件则通过索引切片操作 将单词的第3个字母通过upper()方法转换为大写,并将其余字母拼接在一起。如果不满 第3章SparkSQL 结构化数据处理模块67 足条件则返回原始单词。 第19 行代码通过udf() 方法将名为Up的函数注册为自定义标量函数udf_up,指定自 定义标量函数返回值的数据类型为字符串类型。 第20 行代码通过select() 方法选择df中的id列,并通过标量函数udf_up将value列 中每个单词的第3个字母变为大写。 文件3-3的运行结果如图3-21 所示。 图3-21 文件3-3的运行结果 从图3-21 可以看出,value列中每个单词的第3个字母已变为大写。 2. 聚合函数操作 聚合函数操作是对于一组数据进行计算并返回单个值的函数。聚合函数分为内置聚合 函数(Built-inAggregationFunctions)操作和自定义聚合函数(UserDefinedAggregate Functions)操作。关于内置聚合函数操作和自定义聚合函数操作的介绍如下。 (1)内置聚合函数。 SparkSQL 提供了大量的内置聚合函数供用户直接使用。下面介绍SparkSQL 常用 的内置聚合函数,如表3-3所示。 表3- 3 SparkSQL常用的内置聚合函数 函数语法格式相关说明 count count(col) 用于计算指定列col中非空值的数量 sum sum(col) 计算指定列col中所有数值的总和 avg avg(col) 计算指定列col中所有数值的平均值 max max(col) 计算指定列col中所有数值的最大值 min min(col) 计算指定列col中所有数值的最小值 var_samp var_samp(col) 计算指定列col中样本的方差 stddev stddev(col) 计算指定列col中样本的标准差 表3-3列举了SparkSQL 常用的内置聚合函数。在使用这些内置聚合函数时可以配合 agg 函数进行嵌套使用,这是因为agg 函数允许用户同时使用多个聚合函数对DataFrame 中指定列进行不同的操作,实现一次性完成多种聚合需求。 接下来,以PyCharm 为例,演示表3-3中常用内置聚合函数的使用。在项目Python_ Test的Function文件夹中创建名为AggTest的Python文件,实现使用不同的内置聚合函 Spark大数据分析与应用(Python68 版) 数对DataFrame中指定列进行操作,具体代码如文件3-4所示。 文件3-4 AggTest.py 1 from pyspark.sql import SparkSession 2 from pyspark.sql.functions import * 3 spark = SparkSession.builder.master("local[*]") \ 4 .appName("AggTest") \ 5 .getOrCreate() 6 data = [(3,), (6,), (3,), (4,)] 7 df = spark.createDataFrame(data, ["value"]) 8 result = df.agg( 9 count("value"), 10 sum("value"), 11 avg("value"), 12 max("value"), 13 min("value"), 14 var_samp("value"), 15 stddev("value") 16 ) 17 result.show(truncate=False) 18 spark.stop() 在文件3-4中,第8~16行代码使用不同的内置聚合函数对value列的值进行计算。 文件3-4的运行结果如图3-22所示。 图3-22 文件3-4的运行结果 从图3-22可以看出,value列中非空值的数量为4、所有数值的总和为16、所有数值的 平均值为4.0、所有数值的最大值为6、所有数值的最小值为3、样本的方差为2.0、样本的标 准差为1.4142135623730951。 (2)自定义聚合函数。 自定义聚合函数操作是指内置聚合函数不足以处理指定需求时,用户可以自行定义的 函数,它可以在程序中添加自定义的功能实现对DataFrame进行操作。 自定义聚合函数同样分为定义函数和注册函数两部分操作,其语法格式与自定义标量 函数的语法格式相同,这里不再赘述。 接下来,以PyCharm 为例,演示自定义聚合函数的使用。在项目Python_Test的 Function文件夹中创建名为UDAFTest的Python文件,实现从指定列的字符串中提取数 值并计算它们相加的结果,具体代码如文件3-5所示。 第3章 SparkSQL结构化数据处理模块 69 文件3-5 UDAFTest.py 1 from pyspark.sql import SparkSession 2 from pyspark.sql.functions import udf 3 from pyspark.sql.types import IntegerType 4 spark = SparkSession.builder.master("local[*]") \ 5 .appName("UDAFTest") \ 6 .getOrCreate() 7 data = [("a1b3d2",)] 8 df = spark.createDataFrame(data, ["value"]) 9 def str_num(data): 10 num = [int(x) for x in data if x.isdigit()] 11 if not num: 12 return 0 13 else: 14 return sum(num) 15 udaf_num = udf(str_num, IntegerType()) 16 result = df.select(udaf_num("value")) 17 result.show(truncate=False) 18 spark.stop() 在文件3-5中,第9~14行代码定义一个名为str_num 的函数用于从DataFrame中指 定列的字符串中提取数值并计算它们相加的结果,接收参数data表示DataFrame中的字符 串。实现逻辑为从字符串中提取数值,如果存在数值则返回数值的和,否则返回0。其中 isdigit()方法用于判断字符串中是否存在数值,若存在则保留,然后交由int()方法将其转换 为整数类型。 第15行代码通过udf()方法将名为str_num 的函数注册为自定义聚合函数udaf_ num,指定自定义聚合函数返回值的数据类型为整数类型。 第16行代码在select()方法中通过自定义聚合函数udaf_num 从value列的字符串中 提取数值并计算它们相加的结果。 文件3-5的运行结果如图3-23所示。 图3-23 文件3-5的运行结果 从图3-23可以看出,value列的字符串中数值相加的结果为6。 多学一招:通过@udf()装饰器实现自定义标量函数或聚合函数 从Spark1.3.0版本开始,SparkSQL引入了@udf()装饰器来实现自定义标量函数或 聚合函数,通过该装饰器实现自定义标量函数或聚合函数时,无须手动对定义的函数进行注 册便可直接使用。这种方式简化了用户实现自定义标量函数或聚合函数的过程,提高了代 Spark大数据分析与应用(Python70 版) 码的可读性和可维护性,不过通过@udf()装饰器实现自定义标量函数或聚合函数时,只能 通过DSL风格操作DataFrame,语法格式如下。 @udf(returnType) def fun_name([参数列表]): 函数体 [return value] 上述语法格式中,returnType参数表示自定义标量函数或聚合函数返回值的数据 类型。 3.3 RDD 转换为DataFrame 当RDD无法满足用户更高级别、更高效的数据分析时,可以将RDD转换为DataFrame。 Spark提供了两种方法实现将RDD转换为DataFrame,第一种方法是利用反射机制来推断 包含特定类型对象的Schema元数据信息,这种方式适用于对已知数据结构的RDD 转换; 第二种方法通过编程方式定义一个Schema,并将其应用在已知的RDD中。接下来,本节针 对这两种转换方法进行讲解。 3.3.1 反射机制推断Schema 当有一个数据文件时,人类可以轻松理解其中的字段,如编号、姓名和年龄的含义,但计 算机无法像人一样直观地理解这些字段。在这种情况下,可以通过反射机制来自动推断包 含特定类型对象的Schema元数据信息。这个Schema元数据信息可以帮助计算机更好地 理解和处理数据文件中的字段。 通过反射机制推断Schema主要包含两个步骤,具体如下。 ① 创建一个ROW 类型的RDD。 ② 通过toDF()方法根据Row对象中的列名来推断Schema元数据信息。 上述步骤对应的语法格式如下。 schema = data.map(lambda y: Row(fieldName=y[0], fieldName=y[1], fieldName=y[2], ...)) df = schema.toDF() 上述语法格式中,通过map算子中定义的匿名函数,将名为data的RDD 中每个元素 与Row对象进行映射操作,生成名为schema的RDD。在Row 对象中,fieldName用于指 定列名。映射操作完成后,通过toDF()方法根据映射的列名来推断Schema元数据信息, 将schema转换成名为df的DataFrame。 接下来,以PyCharm 为例,实现通过反射机制推断Schema,具体操作步骤如下。 (1)在本地计算机中准备文本数据文件,这里在本地计算机D 盘根目录下创建文件 person.txt,数据内容如文件3-6所示。