第5章〓Spark SQL编程 Spark SQL是用于结构化数据处理的Spark模块。不同于RDD API,Spark SQL提供了更多有关数据和计算的结构信息,并根据这些信息对计算过程进行优化。Spark SQL接口包括SQL和Dataset API。但无论哪种接口或开发语言(Scala、Java、Python或R等),都使用相同的执行引擎,开发人员可以在不同的API之间进行切换。 Spark SQL是Spark中最重要的概念之一,Spark SQL允许用户对组织到数据库中的视图或表执行SQL查询。用户还可以使用系统函数或自定义函数分析查询计划,优化工作负载。本章将介绍Spark SQL中的核心概念,较少涉及ANSISQL规范或具体的SQL表达式。SQL规范等请参考相关文档。 Spark SQL用于执行SQL查询,可以从JDBC/ODBC数据源或Hive等数据仓库中读取数据,可以将查询结果以Dataset/DataFrame的形式返回给其他编程语言接口。 数据集Dataset是分布式数据集合,是Spark 1.6版本中添加的接口,集成了RDD的优点(强类型、使用lambda函数)和Spark SQL引擎优化执行的优点。Dataset可以从JVM对象构造,使用转换函数进行操作。Dataset API提供Scala 和Java接口。 数据帧DataFrame是元素类型为Row的Dataset,概念上等效于关系数据库中的表,但增加了更多的优化。DataFrame可以从多种数据源构建,如,结构化的数据文件、Hive中的表、外部数据库或已有RDD等。在Scala API中,DataFrame只是Dataset[Row]的一个别名。 5.1Spark SQL基础 5.1.1概述 易用性是Spark流行的原因之一。Spark 提供了一个比Hadoop MapReduce更简单的编程模型来处理大数据。与SQL开发相比,精通Spark核心API的开发人员要少很多。 SQL是用于处理数据的一种ANSI/ISO标准语言,不仅可以存储、修改和检索数据,还可以分析数据。相比Scala、Java或Python等通用编程语言,SQL更容易学习和使用。SQL同时具有强大的数据处理能力,是数据分析的主要工具之一。 HiveQL是一种与SQL类似的语言,在Hadoop中广泛使用,是Hadoop MapReduce的首选接口之一。在Spark崛起之前,Hive是事实上的大数据SQL访问层。Hive最初由Facebook开发,后来成为大数据业界非常受欢迎的工具。Spark最初是RDD通用处理引擎,因sqlContext接口支持SQL子集而快速发展,Spark 1.x中的HiveContext接口支持Hive的绝大多数功能。Spark 2.0版本是Hive的超集,其中内置SQL解析器,同时支持ANSISQL和HiveQL查询。 Spark SQL的强大功能表现在多个方面: SQL分析师可以通过接入Thrift服务器或Spark的SQL接口来利用Spark的计算能力; 而数据工程师和科学家可以通过任一Spark支持的编程语言,使用Spark SQL编程接口(如SparkSession对象的各方法)进行应用开发; 此外,DataFrame还可以传递给Spark MLlib(机器学习库)中的各个机器学习算法,进行机器学习应用开发。 Spark SQL旨在作为OLAP(联机分析处理)数据库运行,而不是作为 OLTP(联机事务处理)数据库运行,不适合非常低延迟的查询。 5.1.2Spark SQL架构 Spark SQL是基于Spark核心执行引擎的库,其架构如图51所示。 图51Spark SQL架构 Spark SQL是在Spark Core执行引擎之上运行的库,提供了比Spark核心API更高层的抽象来处理结构化数据。结构化数据包括存储在关系数据库、NoSQL数据库、Parquet、ORC、Avro、JSON、CSV或任何其他结构化形式的数据。Spark SQL不仅为Spark提供SQL接口,使Spark更易于使用,还可在提升Spark应用程序运行速度的同时,提高开发人员的工作效率。 Spark SQL可用作Scala、Java、Python或R应用程序的数据处理开发库,支持多种查询语言,包括SQL、HiveQL和语言集成查询。此外,还可以仅通过SQL/HiveQL进行交互式分析。 Spark SQL使用JDBC/ODBC为数据仓库应用程序提供SQL接口,或通过命令行控制台提供SQL交互查询接口。任何商业智能(Business Intelligence,BI)工具都可以连接到Spark SQL,在内存中执行分析。可基于API进行Java、Scala、Python或R应用程序开发,用户使用数据源(Data Source)API读写多种数据,创建Dataset/DataFrame。图51 中也显示了传统的基于Spark core和RDD进行开发的操作方式。 5.1.3一个简单的Spark SQL开发例子 以下代码展示从JSON文件创建DataFrame,并显示其内容: /** * SimpleSparkSqlApp.scala * * This example illustrates SparkSession / DataFrame * * @author Rujun Cao * @date 2022/10/00 */ package cn.edu.wzu.SparkExample // SparkSession类是所有Dataset/DataFrame函数的入口点 import org.apache.spark.sql.SparkSession object SimpleSparkSqlApp { def main(args: Array[String]): Unit = { // SparkSession.builder()创建SparkSession // 在Spark shell (REPL)中已经创建(无须再次创建),名称为spark val spark = SparkSession.builder() .appName("Simple Spark SQL example") // 设置特定的配置信息. 此处仅为示例 .config("spark.some.config.option", "some-value") // 代码调用 getOrCreate 方法获取已有或新建的session对象 .getOrCreate() // 从JSON文件创建DataFrame val df = spark.read.json("../tmp/person.json") // 将DataFrame 的内容输出(到stdout) df.show() spark.stop() } } JSON文件person.json内容示例如下: {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} {"name":"Zhaoliu", "age":19, "gender":"male"} 在交互式shell中加载JSON文件并显示内容的执行效果,如图52所示。 图52在Spark shell中运行程序 5.2数据帧DataFrame DataFrame在Spark应用程序中非常重要,它通过模式(schema)来包含类型化的数据,并提供功能强大的API。 作为一个分布式分析引擎,Spark在某种程度上类似于一个操作系统,提供了构建应用程序和管理资源所需的所有服务(维基百科定义操作系统为“管理计算机硬件和软件资源,并为计算机程序提供公共服务的系统软件”)。若要以编程方式使用Spark,则需要了解其中一些关键的API。要执行分析和数据操作,Spark需要逻辑(在应用程序层)存储和物理(在硬件层)存储。在逻辑层,Spark流行的存储容器是类似于关系表的DataFrame。 DataFrame既是数据结构,也是API,可用于Spark SQL、Spark Streaming、MLlib(用于机器学习),并可用于操作基于图结构数据的GraphX。 5.2.1DataFrame结构 1. DataFrame数据组织 DataFrame是对各列命名的记录集,等效于关系数据库中的表或Java中的ResultSet。数据以分区的形式存储,如图53 所示。 图53DataFrame结构 图54给出了一个DataFrame的具体示例: 一个带有模式及数据的DataFrame,命名列描述数据的属性(包含数据类型),数据是行(Row)的集合,存储于分区中。 图54DataFrame的行与列结构 2. Row Row(行)是用于表示一行数据的Spark SQL抽象。从概念上讲,等效于表中的关系元组或行。Row对象是将数据传入和传出Spark的基本方法,在各种Spark开发语言环境中都可以使用。DataFrame中的每条记录都必须是Row类型。Row构造示例代码如下: spark.range(2).toDF().collect() Spark SQL提供了用于创建Row对象的工厂方法。示例代码如下: import org.apache.spark.sql.Row val row1 = Row("Joe Biden", "President", "US") val row2 = Row("Rishi Sunak", "Prime Minister", "UK") 当访问行对象的数据时,仅需要指定待访问的位置。由于Spark维护自己的内部类型信息,因此在使用时必须手动将其转换为正确的可使用的类型。 row1(1)// type any row1.getString(1)// type String 3. Column Column(列)既可以表示简单数据类型(如整数或字符串),也可以表示复合类型(如数组或映射)或者空(null)值。Spark会记录所有这些类型信息,并提供多种列转换方法。 在大多数情况下,可以将Spark Column类型视为表中的列。列的计算基于数据帧中的数据,可以从中进行选择、操作和删除列等(这些操作均为表达式)。对列进行操作需要有Row对象,而Row的存在则以DataFrame为基础,也就是说,不能在DataFrame之外操作实际的列,只能操作逻辑列的表达式,然后在DataFrame上执行该表达式。 Column构造示例: import org.apache.spark.sql.functions.{col, column} val col1 = col("column1") val col2 = column("column2") $"columnName"// Scala中命名列的简写法 可以基于表达式构造列: val col3 = $"a" + 1 基于已有DataFrame构造或引用列: val df = spark.range(1, 50, 2).toDF() val id = df("id") Row对象中列的值可以使用列序号进行访问,如: val presidentName = row1.getString(0) val country = row2.getString(2) 注意: Column仅仅是表达式,可能存在于数据帧中,也可能不存在。在对列名称与目录中维护的列名称进行比较之前,Spark不会解析列。 4. Schema Schema(模式)定义DataFrame的列名称和类型。可以直接使用数据源的Schema,也可以显式自定义Schema。 Schema是由多个StructField(结构字段)组成的StructType(结构类型),这些字段具有名称、类型和布尔标志(用于指定该列是否可以包含缺失值或空值)。运行以下命令后结果如图55所示。 spark.read.format("json").load("../tmp/person.json").schema 图55DataFrame的Schema示例 5.2.2创建DataFrame 使用SparkSession,应用程序可以从已有的RDD、Hive表或Spark数据源创建DataFrame。 从已有RDD创建DataFrame主要涉及SparkSession类的createDataFrame方法; 从Spark数据源创建则主要是基于DataFrameReader类提供的方法,或DataStreamReader类的方法,或是SQL查询的结果。此外,SparkSession还提供了创建空数据帧的方法emptyDataFrame。 1. 从Dataset创建DataFrame 可以直接将Dataset转换为DataFrame,例如,运行以下命令后结果如图56所示: // spark.range构造Dataset,toDF方法转换为DataFrame val df = spark.range(1, 50, 2).toDF() 图56将Dataset转换为DataFrame 2. 从数据源创建DataFrame 可以从各结构化的数据源直接创建DataFrame,例如,基于JSON文件内容创建: val df = spark.read.json("../tmp/person.json") // 或者 val df = spark.read.format("json").load("../tmp/person.json") // 创建视图以支持SQL查询 df.createOrReplaceTempView("dfTable") 从更多数据源创建DataFrame等内容,请参考本章后续内容。 3. 从RDD创建DataFrame 为了将已有的RDD转换为DataFrame,Spark SQL提供了两类不同方法。第一类方法使用反射(reflection)来推断包含特定类型对象的RDD模式。如果在编写Spark应用程序时模式已知,则基于反射的方法代码更简洁。第二类方法使用编程接口构造模式,再将模式应用于RDD。这种方法比较烦琐,但在模式(列及其类型)未知时仍可以构造数据集。 这两类方法所使用的具体方法名称分别是toDF和createDataFrame。 (1) 基于反射推断RDD模式。Spark SQL的Scala接口支持将包含样例类(case class)的RDD自动转换为DataFrame。样例类定义其模式,类参数名称映射为列名称。样例类也可以嵌套或包含复合类型,如Seq 或Array。RDD可以隐式转换为DataFrame,然后注册为表(table)。表可以在后续的SQL语句中使用。例如,运行以下代码后结果如图57所示。 // RDD到DataFrame隐式转换依赖包 import spark.implicits._ // 样例类 case class Person(name: String, age: Long) // 从文本文件创建RDD(Person 对象集合),转换为DataFrame val peopleDF = spark.sparkContext .textFile("../tmp/people.txt") .map(ln=> {val p = ln.split(","); Person(p(0), p(1).trim.toInt)}) .toDF() // 将DataFrame注册为临时视图,以支持SQL操作 peopleDF.createOrReplaceTempView("people") 图57反射推理RDD模式示例 (2) 编程定义RDD模式。当无法预先定义样例类时(如,记录的结构被编码为字符串,将解析文本数据集,或者不同用户以不同的方式对字段进行投影等),可以用编程方式创建DataFrame。主要包括3个步骤: ① 从原始RDD创建Row(行)RDD; ② 创建由结构类型StructType表示的Schema,与Row结构相匹配; ③ 通过SparkSession提供的createDataFrame方法将Schema应用于Row RDD。运行以下示例代码结果如图58所示。 import org.apache.spark.sql.Row import org.apache.spark.sql.types._ // 创建一个RDD, 将其元素转换为Row val rowRDD = spark.sparkContext.textFile("../tmp/people.txt") .map(ln=> {val p = ln.split(","); Row(p(0), p(1).trim)}) // schema是编码字符串 val schemaString = "name age" // 基于字符串生成schema val fields = schemaString.split(" ") .map(field=> StructField(field, StringType, nullable=true)) val schema = StructType(fields) // 将模式应用于RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // 将DataFrame注册为临时视图,以支持SQL操作 peopleDF.createOrReplaceTempView("people") // SQL测试:姓名、年龄查询 val results = spark.sql("SELECT name, age FROM people") results.show(5) 图58自定义RDD模式示例 提示: 是否需要使用自定义Schema读取数据,取决于应用场景。将Spark用于ETL时,最好手动定义Schema,尤其是在使用CSV、JSON等非类型化数据源时,因为Schema推断可能因所读取的数据类型而变化。 5.2.3DataFrame常用操作 与RDD类似,DataFrame支持许多操作,以下给出部分常用操作。 1. select/selectExpr select/selectExpr对DataFrame执行相当于SQL的数据查询,允许操作数据帧中的列。最简单的方法是将列名字符串作为select方法的参数。运行以下示例代码结果如图59所示。 val df = spark.read.format("json").load("../tmp/person.json") df.select("name", "age").show(5) 图59select操作 可以使用不同的方式引用列,给列表达式应用别名等,如: df.select(df.col("name"), col("gender"), expr("age")+2).show(5) select后跟expr的用法,简写为selectExpr。以下两条语句等价,运行以下示例代码结果如图510所示。 df.select($"name", expr("age+2 AS age_after_two_years")).show(5) df.selectExpr("name", "age+2 AS age_after_two_years").show(5) 图510selectExpr操作 可以在列表达式中应用聚合函数,运行以下示例代码结果如图511所示。 df.selectExpr("avg(age+2)", "count(distinct(gender))").show() 图511selectExpr操作中的聚合函数 注意: 不要在select中混合列对象与列名字符串。 2. withColumn withColumn方法添加新列或替换源数据帧中的列,返回新的DataFrame。该方法需要两个参数: 第一个参数是新列的名称,第二个参数是用于生成新列值的表达式。如果列名称参数与已有列名称相同,则替换原有列。运行以下示例代码结果如图51 2所示。 val df2 = df.select(expr("*"), lit(1).alias("One")) df2.withColumn("age_1", $"age" + $"One").show(5) 图512withColumn操作 3. withColumnRenamed withColumnRenamed方法重命名列。如, df2.withColumnRenamed("age_plus_1", "age_1") 提示: 使用withColumn方法也可以重命名列。 4. drop drop方法删除列。如果同时删除多个列,用逗号分隔待删除的列名称。删除单个列时,可以使用列名称,也可以使用列对象。代码示例如下: df2.drop($"One") df2.drop("age_plus_1", "One") 5. printSchema printSchema方法在控制台上以树状形式打印DataFrame的模式schema。如果要控制输出树的深度,可传递树层次参数。不指定层数时,输出整个树结构。运行以下示例代码结果如图513所示。 df.printSchema(2) 图513printSchema操作 6. createTempView/createOrReplaceTempView 对DataFrame使用spark.sql函数执行SQL查询前,需要将其转换为数据表或视图。 createTempView使用给定名称创建本地临时视图。此临时视图的生存期与用于创建此数据集的SparkSession相关联。本地临时视图是会话范围的,其生存期是创建它的会话的生存期,会话终止时将自动删除。本地临时视图不绑定到任何数据库,不能使用db1.view1来引用本地临时视图。 当临时视图已存在时,createOrReplaceTempView更新本地临时视图。运行以下示例代码结果如图51 4所示。 df2.createOrReplaceTempView("person") spark.sql("select * from person order by age desc").show(5) 图514createOrReplaceTempView操作 7. createGlobalTempView/createOrReplaceGlobalTempView createGlobalTempView使用给定名称创建全局临时视图。全局临时视图的生存期与Spark应用程序相关联。全局临时视图是跨会话的,其生存期是Spark应用程序的生存期,应用程序终止时自动删除。全局临时视图与系统保留数据库global_temp相关联,必须使用限定名称来引用全局临时视图(如,SELECT * FROM global_temp.view1)。 createOrReplaceGlobalTempView当全局临时视图已存在时更新之,否则创建。运行以下示例代码结果如图51 5所示。 df2.createOrReplaceGlobalTempView("people") spark.sql("SELECT * FROM global_temp.people").show(5) 图515createOrReplaceGlobalTempView操作 8. filter/where 对DataFrame的行进行过滤时,可以使用布尔表达式,或直接指定条件字符串。如, // 以下3行语句等价 df2.filter("age < 20") df2.filter(col("age") < 20) df2.where($"age" < 20) 提示: 当使用AND连接多个过滤条件时,除了在同一表达式中指定多个条件外,也可以使用Spark的链式操作(Spark引擎可以更有效地对查询进行优化),即,类似以下的查询,优先选择后一种方式: df2.filter($"age" < 20 && $"name" <=> "Zhaoliu") df2.filter($"age" < 20).filter($"name" <=> "Zhaoliu") 9. union union返回两个DataFrame的并集。执行union操作的两个DataFrame结构必须相同: df2.union(df.withColumn("Two", lit(2))).show(5) 10. distinct distinct去掉DataFrame中的重复行。运行以下示例代码结果如图51 6所示。 df2.union(df.withColumn("One", lit(1))).distinct().show(5) 图516distinct操作 11. sort/orderBy 对DataFrame的行进行排序,可以使用sort/orderBy等方法。运行以下示例代码结果如图51 7所示。 df2.sort("age", "name")// 与orderBy等效 可以使用asc和desc函数指定排序方向: val df3 = df2.union(df.withColumn("Two", lit(2))) df3.orderBy(desc("age"), asc("One")).show(9) 图517orderBy操作 12. groupBy groupBy方法使用指定的列参数对DataFrame的行进行分组,返回的结果可用于数据聚合。运行以下示例代码结果如图518所示。 val genderGroup = df.groupBy("gender") genderGroup.count().show() 图518groupBy操作 注意: groupBy方法返回的结果是RelationalGroupedDataset,而不是DataFrame。类似地,cube/rollup/pivot方法也返回RelationalGroupedDataset。该类主要用于数据聚合(agg函数)、统计(avg/mean/sum)等。 13. agg agg方法对源中DataFrame的一列或多列执行指定的聚合,返回结果DataFrame。运行以下示例代码结果如图51 9所示。 df2.agg(max("age"), count("gender")).show(1) 图519agg操作 14. describe/summary describe方法可用于探索性数据分析。它返回源DataFrame中列的统计摘要信息,包括计数、最小值、最大值、平均值和标准差。方法的输入参数为列名称,它采用一列或多列的名称作为参数。如果未指定参数,则计算所有列。运行以下示例代码结果如图520所示。 df2.describe().show() 图520describe操作 类似地,summary方法计算数值列和字符串列的统计信息。可用统计量包括count、mean、stddev、min、max、分位数、count_distinct、approx_count_distinct等。如果未指定参数,则计算count/mean/stddev/min/quartiles (percentiles at 25%,50%,and 75%)/max。运行以下示例代码结果如图521 所示。 df3.summary().show() 图521summary操作 注意: describe/summary函数仅用于探索性数据分析,不保证所生成的结果数据集schema的向后兼容性。如果要以编程方式计算汇总统计信息,则使用agg函数。 15. na/空值数据处理 na方法返回的DataFrameNaFunctions可用于DataFrame中的空值数据处理,如,填充、删除等。运行以下示例代码结果如图522 所示。 df2.na.fill("female", Array("gender")).show() df2.na.drop(Array("age")).show() 图522na操作 分别填充多个字段的示例代码如下: val valuesToFill = Map("age"-> 22, "gender"-> "X") df2.na.fill(valuesToFill).show() 5.2.4保存DataFrame Spark SQL使用统一接口,将DataFrame写入关系数据库、NoSQL数据库或各种格式的文件。可以使用DataFrame的write方法将DataFrame保存到各种存储系统。 write方法返回DataFrameWriter实例,该类提供多种将DataFrame内容保存到数据源的方法。DataFrameWriter类的builder方法用于指定数据保存的不同选项,如格式、分区及数据处理方式等。write方法的一般用法类似如下: df.write.format("output-data-source-format").save()  如果输出为JSON文件,则write方法用法如下: df.write.format("json").save("../tmp/json/path") //或者 df.write.json("../tmp/json/path")  如果输出为CSV文件,则write方法的用法如下: df3.write.format("csv").option("header", true).save("path") 5.3数据集Dataset Dataset是特定域(domainspecific)的强类型集合(strongly typed collection),可以使用函数或关系操作并行转换这些对象。Dataset有一个无类型视图(untyped view),即DataFrame,是行(Row )数据集。 在Spark 2.0中,DataFrame只是Scala和Java API中的行数据集,其定义为: type DataFrame = Dataset[Row] 定义在DataFrame上的操作也称为“非类型转换”(untyped transformation),作为对比,强类型(Scala/Java)Dataset上定义的操作称为“类型转换”(typed transformation)。 Dataset上可用的操作分为转换和动作。转换生成新的数据集,动作触发计算并返回结果。转换包括map、filter、select和aggregate(如groupBy)等,动作操作包括count、show或写入文件系统等。 Dataset操作是“惰性的”,即仅在调用动作时触发计算。在Spark内部,Dataset表示描述生成数据所需的逻辑计划。执行动作调用时,Spark的查询优化器会优化该逻辑计划,并生成物理计划,以便以并行和分布式方式高效执行计划。想要了解逻辑计划以及优化后的物理计划,可以使用explain函数。 为有效支持特定域的对象,需要编码器(encoder)。编码器 将特定域的类型映射到Spark的内部类型系统。例如,给定一个具有两个字段name (string)和age (int)的类Person,编码器会告诉Spark在运行时生成代码,将Person对象序列化为二进制结构。这种二进制结构通常只需占用较少的内存,并且针对数据处理进行过优化(如,列式存取)。可以使用schema函数了解数据的内部二进制表示形式。 Dataset与RDD类似,但不使用Java序列化(serialization或Kryo),而是使用特定的编码器 来序列化对象,以便通过网络进行传输或处理。虽然编码和标准序列化都负责将对象转换为字节,但编码器动态生成代码,允许Spark执行许多操作,而无须将字节反序列化为对象。 5.3.1创建Dataset 创建Dataset有两种常用方法。最常见的方法是用SparkSession的read方法读取存储系统中的文件。运行以下创建Dataset的示例代码,结果如图523 所示。 case class Person(name: String, age: Int, gender: String) // 样例类编码器 val caseDS = Seq(Person("Wangwu", 22, "male")).toDS() // 导入spark.implicits._,为许多常用类型自动提供编码器 val ds2 = Range(1, 5).toDS() spark.range(2).toDS() // 通过名称映射将DataFrame转换为Dataset val jsonDS = spark.read .schema("name String, age Int, gender String") .json("../tmp/person.json").as[Person] jsonDS.show(5,false) 图523创建Dataset 5.3.2Dataset常用方法 Dataset的方法可用于DataFrame,以下给出了部分常用操作。 1. as as方法将Dataset的记录映射为指定的类型。列映射方法取决于编码器U的类型: (1) U是class,该类的字段将映射到同名的列(spark.sql.caseSensitive区分字母大小写); (2) U是元组tuple,列将按序数映射(即将第一列分配给_1); (3) U是基本类型(字符串、整数等),使用DataFrame 的第一列。 如果Dataset的schema与所需的Encoder类型不匹配,则可根据需要进行选择,使用别名、重新排列或者重命名。 注意: as仅更改传递到类型化操作(如 map)中的数据视图,不移除指定类中不存在的任何列。 2. cache/persist cache/persist方法持久化Dataset。cache及不带参数的persist使用默认存储类别(MEMORY_AND_DISK)。 3. checkpoint checkpoint方法返回此数据集的检查点版本信息。在计算量可能呈指数增长的迭代算法中,checkpoint方法尤其有用。checkpoint被保存到SparkContext#setCheckpointDir设置的目录中。 5.4数据源 通过DataFrame接口,Spark SQL支持对多种数据源的操作。可以使用关系转换、创建临时视图等对DataFrame进行操作。视图支持SQL查询。本节介绍Spark数据源的加载和保存方法,以及可用于内置数据源的特定选项。 Spark中的数据读取接口在DataFrameReader中定义,通过SparkSession对象的read属性访问; 数据保存接口在DataFrameWriter中定义,通过Dataset对象的write属性访问。 5.4.1通用load/save函数 Spark SQL中的默认数据源是parquet格式(由spark.sql.sources.default设置)。示例代码如下: val userDF = spark.read.load("../tmp/users.parquet") userDF.write.save("../tmp/users-parquet-dir") 1. 常用选项 数据源类型由其全名称(如org.apache.spark.sql.parquet)指定,但对内置数据源,可以使用短名称(如json、parquet、jdbc、orc、libsvm、csv、text等)。在加载或保存数据源时,可以指定相应的参数或选项。具体的内置数据源及其相关参数或选项,请参考API文档。 加载的数据源可以转换为其他数据源。例如,将json文件保存为csv格式: val jsonDF = spark.read.format("json").load("people.json") jsonDF.write.format("csv").save("../tmp/people-csv-dir") 2. read mode 以下代码示例使用特定选项加载csv文件: val csvDF = spark.read.format("csv") .option("header", "true") .option("inferSchema", "true") .option("sep", ";") .load("people.csv") 读取数据源时,通常会指定format(格式)、schema(模式)、read mode(读取模式)、option(选项)以及path(路径)。至少必须提供格式和路径参数。 读取模式设置Spark遇到格式错误的记录时的处理方式,包括: (1) permissive 在遇到损坏的记录时将所有字段设置为null(默认值); (2) dropMalformed 删除错误行; (3) failFast 遇到格式错误的记录时立即失败(抛出异常)。 3. save mode 以下代码示例使用特定选项将数据集保存为csv格式: csvDF.write.format("csv") .option("mode", "OVERWRITE") .option("dateFormat", "yyyy-MM-dd") .save("path/to/file") 使用DataFrameWritier保存数据时,通常会指定format(格式)、save mode(写入模式)、option(选项)以及path(路径),至少必须提供路径参数。 save mode指定如果在输出位置已存在数据时的处理方式,包括: (1) append追加数据; (2) overwrite覆盖现有数据; (3) errorIfExists中止操作(默认值,抛出异常,或写作error); (4) ignore忽略(无操作)。 4. SQL直连文件 除了使用读取API将文件加载到DataFrame再执行查询外,还可以直接使用SQL查询文件。运行以下示例代码,结果如图524 所示。 val sqlDF = spark.sql("SELECT * FROM parquet.`../tmp/users.parquet`") sqlDF.show() 图524直接使用SQL查询文件 5. 将数据保存到表 使用saveAsTable方法可以将DataFrame持久保存到Hive metastore的表中。与 createOrReplaceTempView命令不同,saveAsTable存储的是数据帧的内容,并在Hive metastore中存储元数据,即使重启Spark,持久表仍然存在。运行以下示例代码,结果如图525所示。 sqlDF.write.mode("overwrite").saveAsTable("t_user") sql("select * from t_user").show(5) 图525saveAsTable方法保存数据到表 当模式为追加时,如存在同名表,则使用已有表的格式和选项。DataFrame 的schema中的列的顺序不需要与现有表的列顺序相同,saveAsTable将使用列名来查找正确的位置。 注意: 此功能不需要部署Hive。Spark使用Derby创建一个默认的本地Hive metastore。 5.4.2文件数据源 1. 文件数据源选项 通用的文件数据源选项包括以下几种。 (1) 忽略损坏的文件。当spark.sql.files.ignoreCorruptFiles设置为true时,Spark任务在遇到被损坏的文件时继续运行,并且仍会返回已读取的内容,示例代码如下: // enable ignore corrupt files spark.sql("set spark.sql.files.ignoreCorruptFiles=true") // 路径中非parquet文件被忽略 val testCorruptDF = spark.read.parquet( "examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/") (2) 忽略缺失文件。Missing File(缺失的文件)是指在构造数据帧DataFrame之后,目录中已删除的文件。当spark.sql.files.ignoreMissingFiles设置为true时,Spark从文件中读取数据时忽略缺失的文件,Spark任务在遇到文件缺失时继续运行,并返回已读取的内容。 (3) 文件过滤器。pathGlobFilter用于仅包含与模式匹配的文件,用法同org.apache.hadoop.fs.GlobFilter,不改变发现分区的方式。示例代码如下: // 仅加载parquet格式文件,滤出json等其他格式文件 spark.read.format("parquet") .option("pathGlobFilter", "*.parquet") .load("examples/src/main/resources/dir1") (4) 递归文件查找。recursiveFileLookup用于递归加载文件,并禁用分区推理。默认值为false,如果数据源在recursiveFileLookup为true时显式指定了partitionSpec,则会引发异常。示例代码如下: // 递归加载指定路径(及子目录)下的文件 spark.read.format("parquet") .option("recursiveFileLookup", "true") .load("examples/src/main/resources/dir1") (5) 路径修改时间过滤器。modifiedBefore和modifiedAfter可以同时使用或单独使用,以实现对所加载数据的更细粒度的控制(Structured Streaming文件源不支持)。 modifiedBefore/modifiedAfter为可选时间戳,用于仅包含修改时间早于/晚于指定时间的文件。时间戳格式为: YYYYMMDDTHH:mm:ss(如,20221102T11:02:02)。如果未提供时区选项,那么将根据Spark会话时区(spark.sql.session.timeZone)解释时间戳。运行以下示例代码,结果如图526 所示。 // 加载限定时间范围内的文件 spark.read.format("parquet") .option("pathGlobFilter", "*.parquet") .option("modifiedBefore", "2022-11-02T11:02:02") .option("modifiedAfter", "2000-01-01T00:00:00") .load("examples/src/main/resources/dir1") .show(5) 图526数据加载过滤器示例 2. CSV文件数据源 CSV(CommaSeparated Value)是一种常见的文本文件格式,其中每行表示由多个列组成的单个记录。 CSV文件虽然看起来结构良好,但实际处理比较复杂,因为在实际生产中无法对其包含的内容或结构方式做出太多假设。因此,CSV reader具有较多的选项,用于处理诸如字符转义问题。 Spark SQL 提供spark.read().csv("file_name")将CSV格式的文件或文件目录读取到 DataFrame中,提供dataframe.write().csv("path")方法写入 CSV 文件。函数option()可用于自定义读取或写入的行为,例如标题头、分隔符、字符集等。例如, // Read a csv with delimiter and a header spark.read.option("delimiter", ";") .option("header", "true").csv("path-to-csv") CSV数据源常用选项如表51 所示。 表51CSV文件源常用选项 选项名 含义 默认值 读/写 sep/delimiter 列分隔符 , RW encoding 文件编码方式 UTF8 RW quote 设置用于转义单个字符的引号。引号内的分隔符作为普通字符 “ RW escape 转义符(单字符),用以转义引号内的引号符(作为普通字符) \ RW escapeQuotes 是否将包含引号的值括在引号中 true W header 对于读,使用第一行作为列的名称 对于写,将列名称写入第一行 false RW dateFormat 日期格式 yyyyMMdd RW nanValue NotaNumber的字符串表示形式 NaN R ignoreLeadingWhiteSpace 忽略前导空格符 false RW ignoreTrailingWhiteSpace 忽略尾部空格符 false RW mode 解析期间处理损坏记录的模式 PERMISSIVE R locale 区域语言标记,IETF BCP 47 格式 enUS R compression 压缩编解码器,none、bzip2、gzip、lz4、snappy和deflate W 3. JSON数据源 JSON(JavaScript Object Notation)是JavaScript的常用数据格式。在Spark中使用的JSON文件,是行分隔(linedelimited)的JSON文件(每行包含一个单独的、有效JSON对象),区别于具有大型JSON对象或数组的文件。行分隔的JSON是一种更稳定的格式,可以方便地将新记录添加到文件中(而不必读取整个文件然后再写),更容易使用。JSON对象具有结构,JavaScript(JSON所基于的)具有类型,因此,Spark可以对行分隔JSON作更多假设(选项比CSV少)。普通的多行JSON文件需要将multiLine选项设置为true。 Spark SQL可自动推断JSON数据集的模式,将其加载为DataFrame(即Dataset[Row])。SparkSession.read.json()方法既可以加载JSON文件,也可以转换Dataset[String]。例如(运行结果见图527 ): // 读JSON文件,参数可以是文件名也可以是目录 spark.read.option("mode", "FAILFAST") .json("../tmp/person.json") .show(5) // 从JSON数据集(Dataset[String])创建DataFrame val personDS = spark.createDataset(""" {"name":"Cao","addr":{"city":"Wenzhou","state":"Zhejiang"}}""" ::Nil) val personDF = spark.read.json(personDS) 图527JSON数据源加载为DataFrame/Dataset示例 JSON数据源常用选项见表52。 表52JSON数据源常用选项 选项名 含义 默认值 读/写 timeZone 时区 spark.sql.session.timeZone RW allowComments 忽略Java/C++样式注释 false R allowSingleQuotes 是否允许使用单引号 false R mode 解析期间处理损坏记录的模式 PERMISSIVE R dateFormat 日期格式 yyyyMMdd RW timestampFormat 时间戳格式 yyyyMMdd'T'HH:mm:ss[.SSS] RW multiLine 记录可跨多行 false R encoding 文件编码方式 读,自动检测; 写,UTF8 RW lineSep 行分隔符 读,\r,\n,\r\n; 写,\n RW dropFieldIfAllNull 忽略全null,或全空数组/结构的列 false R ignoreNullFields 忽略空字段 spark.sql.jsonGenerator.ignoreNullFields W 4. Parquet数据源 Apache Parquet是面向列的开源数据存储格式,提供各种存储优化策略,尤其适用于数据分析。Parquet格式文件可以通过列压缩节省存储空间,并允许读取单个列而非整个文件。Parquet是Spark的默认文件格式,读Parquet文件比JSON或CSV更高效。Parquet支持复杂类型,其列数据可以是数组(CSV文件不支持)、map或struct等。示例代码如下: val userDF = spark.read.parquet("../tmp/users.parquet") userDF.write.save("../tmp/users-parquet-dir") Parquet的可选项较少。除压缩方式compression外,另一个选项是mergeSchema,其默认设置是由spark.sql.parquet.mergeSchema确定的。有多种数据处理系统支持Parquet格式。Spark SQL支持读取和写入Parquet文件时自动保留原始数据的Schema。读取Parquet文件时,出于兼容性考虑,所有列都将自动设置为可为空(nullable)。 类似于Protocol Buffer、Avro和Thrift,Parquet也支持schema演进。用户可以从简单的schema开始,根据需要逐渐添加更多列,最终可能会得到多个具有不同schema但相互兼容的Parquet文件。Parquet数据源能够自动检测并合并所有这些文件的schema。 由于模式合并比较耗费资源,且在多数情况下不是必需的,因此其默认状态是关闭的。可以将spark.sql.parquet.mergeSchema设置为true,或是在读取文件时将mergeSchema设置为 true。 运行以下示例代码,结果如图528 所示。 import spark.implicits._ // 由RDD创建DataFrame(value: Int, square: Int) val squaredDF = spark.sparkContext.makeRDD(1 to 5) .map(i => (i, i * i)).toDF("value", "square") // 另一DataFrame(value: Int, cube: Int),cube <-- square val cubedDF = spark.sparkContext.makeRDD(6 to 10) .map(i => (i, i * i * i)).toDF("value", "cube") // 将两个DataFrame,存储到不同分区 squaredDF.write.parquet("data/test_table/key=1") cubedDF.write.parquet("data/test_table/key=2") // 加载分区数据,schema合并 val mergedDF = spark.read.option("mergeSchema", "true") .parquet("data/test_table") mergedDF.printSchema() 图528Parquet数据源合并 注意: 不同版本的Parquet文件可能不兼容。使用Spark不同版本(尤其是旧版本)写Parquet文件时要注意文件格式版本。 5. ORC数据源 Apache ORC是一种列式存储格式(Optimized Row Columnar,ORC)文件,具有zstd压缩、布隆过滤器(bloom filter)和列式加密等功能。ORC是借鉴Hive的一种高效文件格式。Spark了解ORC文件格式细节,读数据时仅提供mergeSchema选项。ORC与Parquet非常相似,但Spark针对Parquet会有特殊优化。 Spark支持两种ORC实现(内置实现和Hive实现),由spark.sql.orc.impl控制,两种实现的大多数功能相同(设计目标不同)。内置实现遵循与Parquet一致的Spark数据源行为; Hive实现使用Hive SerDe,遵循Hive规范。 在Spark一些历史版本的内置实现中,使用内置String处理CHAR/VARCHAR,而Hive实现使用Hive CHAR/VARCHAR(查询结果不同)。从Spark 3.1.0开始,Spark端支持CHAR/VARCHAR(差异消除)。 ORC数据读写示例代码如下: val flights = spark.read.format("orc").load("../tmp/flight.orc") csvDF.write.format("orc").save("../tmp/csv-to-orc") 6. Text文件数据源 Spark SQL可以直接读写文本文件,文件内容被解析为一组字符串。读文本文件的方法是spark.read().text(),可以将文本文件或目录读入DataFrame。读文本文件时,每行都是Row对象,默认情况下,其value列的内容是文本行内容。写文件使用dataframe.write().text()方法。option()方法可以修改默认的行分隔符、设置压缩方式等。示例代码如下: val txt = spark.read.option("lineSep", ",").text("test.txt") txt.write.option("compression","gzip").text("compressed_txt") option方法可以修改默认的行分隔符(lineSep,读或写,默认值为“\n”),压缩方式(compression,写,默认无压缩)。 5.4.3Hive数据源 Spark SQL支持Apache Hive数据源的读写。但Hive有大量依赖项,这些依赖项不包含在默认的Spark发行版本中。如果在类搜索路径上可以找到Hive依赖项,那么Spark将自动加载它们(这些Hive依赖项还必须存在于所有的Worker节点,因为它们需要访问Hive序列化和反序列化库SerDes才能访问Hive中的数据)。 配置Hive主要是通过conf/目录中的Hivesite.xml、coresite.xml及hdfssite.xml来完成。具体配置过程请参考Hive的相关文档。 使用Hive数据源时,必须使用支持Hive的SparkSession实例,包括Hive metastore连接、Hive serdes的支持,以及Hive用户定义函数的支持。未部署Hive时依然可以启用Hive支持。未配置hivesite.xml时,Spark context会自动在当前目录下创建metastore_db目录,并创建由spark.sql.warehouse.dir配置的目录,该目录默认为Spark应用程序启动时的当前目录下的sparkwarehouse,相应地,启动Spark应用程序的用户需要写权限。 Hive数据源读写示例代码如下: // 创建支持Hive的SparkSession对象 val spark = SparkSession.builder() .appName("Spark Hive Example") .config("spark.sql.warehouse.dir", "spark-warehouse") .enableHiveSupport() .getOrCreate() // 创建Hive数据表,加载数据 spark.sql("create TABLE src(key INT, value STRING) USING hive") spark.sql("load DATA LOCAL INPATH 'kv1.txt' into TABLE src") // 读Hive数据表,创建DataFrame val hiveDF = spark.sql("select * FROM src WHERE key < 100") // 创建Hive表,parquet存储格式,将DataFrame写入表中 sql("create TABLE hive_t(key int, value string) STORED as PARQUET") hiveDF.write.mode(SaveMode.Overwrite).saveAsTable("hive_t") 5.4.4SQL数据源 Spark SQL可以连接到各种SQL数据源,如MySQL、PostgreSQL,或Oracle、SQLite等数据库。有别于文件数据源,在如何连接到数据库时需要考虑数据库连接选项,包括身份认证和连接方式(Spark集群的网络是否连接到数据库网络)等。 Spark SQL可以使用JDBC从其他数据库读取数据,结果以DataFrame的形式返回。与RDD(JdbcRDD)相比,应优先使用此方式,以便Spark SQL对数据进行处理,或与其他数据源连接。JDBC数据源也更易用于Java或Python环境,因其不需要用户提供ClassTag(不同于Spark SQL JDBC服务器,其他应用程序使用Spark SQL运行查询)。 使用JDBC连接数据源时,需要在Spark类路径中包含特定数据库的JDBC驱动程序。例如,要想使用Spark Shell连接PostgreSQL数据库,需要运行如下命令: ./bin/spark-shell --jars ./jars/postgresql-42.5.0.jar 1. PostgreSQL数据源 Spark支持的JDBC选项不区分字母大小写。JDBC数据源选项通过相应的option/options方法设置(DataFrameReader/DataFrameWriter)。 对JDBC数据库连接属性,可以在数据源选项中指定,如用户名、密码、数据库服务器链接地址等。读写PostgreSQL数据源,需要使用PostgreSQL JDBC驱动程序。 运行以下示例代码,结果如图529 所示。 // 使用load方法加载PostgreSQL数据源 // option方法设置数据源连接属性 val pgDF = spark.read.format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "schema.tablename") .option("user", "username") .option("password", "password") .load() // 将DataFrame写到PostgreSQL数据表中 pgDF.write.format("jdbc") .option("url", "jdbc:postgresql:dbserver") .option("dbtable", "write_to_table") .option("user", "username") .option("password", "password") .save() 图529连接PostgreSQL数据源示例 2. MySQL数据源 MySQL数据源的读写与PostgreSQL类似,连接时需要指定MySQL数据库服务器。以下示例使用java.util.Properties传递MySQL连接信息(需要加载MySQL JDBC驱动程序)。 运行以下示例代码,结果如图530所示。 // 使用load方法加载MySQL数据源 // 使用Properties传递数据源连接属性 val connProp = new java.util.Properties() connProp.put("user", "username") connProp.put("password", "password") val myDF = spark.read .jdbc("jdbc:mysql:dbserver", "tablename", connProp) // 类似地,可以将DataFrame写到MySQL数据表 myDF.write.jdbc("jdbc:mysql:dbserver", "to_table", connProp) 图530访问MySQL数据源 3. JDBC数据源选项 Spark SQL连接JDBC数据源,除用户名、密码(SQLite数据库不需要)等属性之外,还支持分区等选项。常用选项见表53 。 表53JDBC数据源选项 选项名 含义 示例 读/写 url JDBC URL,形如: jdbc:subprotocol:subname 可以在URL中指定特定的连接属性 jdbc:postgresql://localhost/test?user=fry&password=secret RW dbtable 数据表名。读数据时可以是子查询。 不能同时指定dbtable和query RW query 将数据读入Spark的查询。指定的查询被用作FROM子句中的子查询。Spark为子查询子句分配别名 .option("query", "select c1, c2 from t1") RW driver 用于连接的JDBC驱动程序 RW partitionColumn 对表进行分区的列名 R numPartitions 最大分区数(并行读写) 与JDBC最大并发连接数有关 RW queryTimeout 语句执行超时(秒) 0表示无限制 RW fetchsize 每次操作读取的行数 R batchsize 每次写操作要插入的行数 W truncate 启用SaveMode.Overwrite后,此选项会导致Spark 截断现有表,而不是删除并重新创建它。这可以防止表元数据(例如,索引)被删除。 由于DBMS中TRUNCATE TABLE的行为不同,使用它并不总是安全的 .option("truncate", false) W createTableOptions 此选项允许在创建表时设置特定于数据库的表和分区选项 W createTableColumnTypes 创建表时要使用的数据库表列数据类型。类型以与CREATE TABLE语法相同的格式指定 W customSchema 读数据时的自定义模式。列名应与JDBC表的相应列名相同 R 5.5安装关系数据库 5.5.1PostgreSQL 1. 安装PostgreSQL Server 直接安装,执行如下命令: sudo apt-get update sudo apt-get -y install postgresql 注意: PostgreSQL安装后,默认权限认证为peer,即,使用 Linux用户登录(认证)。但由于PostgreSQL新安装时只创建了postgres用户,所以会导致数据库登录失败(与Linux系统用户账户不一致)。常用的解决办法有两种: 一种是切换Linux用户为postgres(su postgres ...)后登录数据库,创建与Linux系统账户同名的数据库角色(用户,create role ...); 另一种方法是修改PostgreSQL的系统配置。 2. 安装PostgreSQL Client (pgAdmin 4) 可以手动下载安装包进行安装。首先确认认证key: # Install the public key for the repository (if not done previously): wget -c https://www.pgadmin.org/static/packages_pgadmin_org.pub | sudo gpg --dearmor -o /usr/share/keyrings/packages-pgadmin-org.gpg 再下载安装包进行安装: # Create the repository configuration file: sudo sh -c 'echo "deb [signed-by=/usr/share/keyrings/packages-pgadmin-org.gpg] https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/$(lsb_release -cs) pgadmin4 main" > /etc/apt/sources.list.d/pgadmin4.list && apt update' pgAdmin 4安装包的下载地址是: https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/jammy/dists/pgadmin4/main/binaryamd64/pgadmin4desktop_6.15_amd64.deb 5.5.2MySQL Server 直接安装,执行如下命令(安装MySQL Server 8.x): sudo apt-get install mysql-server 服务验证: systemctl is-active mysql 安全认证: sudo mysql_secure_installation 注意: Server安装后需要设置安全认证策略,设置root等数据库用户的密码(见图531)。 图531设置MySQL安全认证策略 更多安装、使用的详细信息,请参考MySQL用户手册及相关资料。