学习目标 . 了解数据集分析,能够描述用户行为数据中包含的信息; . 熟悉实现思路分析,能够描述热门品类Top10分析的实现思路; . 掌握实现热门品类Top10分析,能够编写用于实现热门品类Top10分析的Spark 程序; . 掌握运行Spark程序,能够将Spark程序提交到YARN 集群运行。 品类是指商品所属的分类,例如服装、电子产品、图书等。进行热门品类Top10分析 的目的在于从用户行为数据中挖掘出排名前10的最受用户喜爱的品类。通过对热门品 类的了解,企业可以调整其销售策略,提高对这些品类的投入,优化促销活动,从而更好地 满足消费者需求,提高销售额。本章将讲解如何对电商网站的用户行为数据进行分析,从 而统计出排名前10的品类。 3.1 数据集分析 热门品类Top10分析使用的数据集为某电商网站在2022年11月产生的用户行为 数据,这些数据存储在文件user_session.txt中,该文件的每一行数据都记录了商品和用 户相关的特定行为。下面,以文件user_session.txt中的一条用户行为数据为例进行详细 分析,具体内容如下。 {"user_session":"000007b4- 6d31- 4590- b88f- 0f68d1cee73c","event_type":" view","category _id":"2053013554415534427","user _id":"572115980","product _ id":"1801873","address_name":"NewYork","event_time":"2022-11-16 08:12:52"} 从上述内容中可以看出,文件user_session.txt中的每一条用户行为数据都以JSON 对象的形式存在,该对象包含多个键值对,每个键值对代表着不同的信息。下面,通过解 读这些键,介绍用户行为数据中各项信息的含义。 . user_session:表示用户行为的唯一标识。 .event_type:表示用户行为的类型,包括view(查看)、cart(加入购物车)和 purchase(购买)。 第3章热门品类Top10分析63 .categoryid:表示品类的唯一标识。 .userid: 示用户的唯一标识。表(_) .prouct_id:表示商品的唯一标识。d(_) .addresname:表示产生用户行为的区域。 .event_me:表示产生用户行为的时间。ti(_) 3.实现思路分析 实现热门品类Top10分析的核心在于统计不同品类的商品被查看、加入购物车和购 买的次数,然后按照特定排序规则对统计结果进行处理,以获取排名前10的热门品类。 在本项目中,实现热门品类Top10分析的排序规则如下。首先,根据不同品类的商品被 查看的次数进行降序排序;然后,根据不同品类的商品被加入购物车的次数进行降序排 序;最后,根据不同品类的商品被购买的次数进行降序排序。下面,通过图3-1详细描述 本项目中热门品类Top10分析的实现思路。 图3- 1 热门品类Top10分析的实现思路 针对热门品类Top10分析的实现思路进行如下讲解。 .读取/转换:读取用户行为数据,提取其中的用户行为类型(eventtype)和品类唯 一标识(category_id)。为了便于后续统计不同品类的商品被查加入购物车和 购买的次数,我们将提取的数据转换为元组。元组的第一个元素包含用户行为类 型和品类唯一标识,第二个元素为1,用于标识当前品类的商品被查看、加入购物 车或购买的次数。看、(_) 64 Spark项目实战(第2版) .聚合:统计不同品类的商品被查看、加入购物车和购买的次数,并生成新的元组。 该元组的第一个元素包含用户行为类型和品类唯一标识,第二个元素为当前品类 的商品被查看、加入购物车或购买的次数的统计结果(count)。 .过滤/转换:为了方便后续识别不同品类商品在不同用户行为下的统计结果,我 们根据用户行为类型将聚合结果划分为三部分。这样,我们可以分别得到各品类 商品被查看、加入购物车和购买次数的统计结果。接着,我们对过滤得到的结果 进行转换,生成新的元组。该元组的第一个元素为品类唯一标识,第二个元素为 当前品类的商品被查看(view_count)、加入购物车(cart_count)或购买(purchase_ count)的次数的统计结果。 .合并:根据商品的唯一标识对过滤/转换得到的三部分数据进行合并,生成新的 元组。该元组的第一个元素为商品的唯一标识;第二个元素包含当前品类的商品 被查看、加入购物车和购买的次数的统计结果。 .排序:根据排序规则对合并结果进行降序排序,并获取排序结果的前10行数据, 从而得到热门品类Top10 。 3.实现热门品类Top10分析 3 3.环境准备 3.1 在进行某项事务前,充足的准备能够使我们更好地发挥自己的潜力,提高自身能力和 素质。这包括深入了解相关知识,积累相关经验,以及适时地进行规划。通过充分准备, 我们能够为自己创造更多机会,增加成功的可能性,并提高对事务的把控能力和执行 效果。 本项目主要使用Scala语言在集成开发工具InteliJIDEA中实现Spark程序。在开 始实现Spark程序之前,需要在本计算机中安装JDK和Scala并配置系统环境变量,以及 在InteliJIDEA中安装Scala插件、创建项目和导入依赖。关于安装JDK和Scala并配 置系统环境变量的操作读者可参考本书提供的补充文档。接下来,主要以InteliJIDEA 中执行的一系列相关操作进行讲解,具体内容如下。 1.安装Scala插件 默认情况下,InteliJIDEA并不支持Scala语言。因此,在集成开发工具InteliJ IDEA中使用Scala语言实现Spark程序前,需要通过安装Scala插件来添加相应的支持。 接下来,将讲解如何在InteliJIDEA中安装Scala插件,具体操作步骤如下。 (1)打开InteliJIDEA,进入WelcometoInteliJIDEA界面,如图3-2所示。 (2)在图3-2所示界面中,单击Plugins选项,在对话框中部的搜索栏内输入Scala, 搜索Scala相关插件,如图3-3所示。 需要说明的是,如果读者在打开InteliJIDEA时,直接进入具体项目的界面,那么可 以在InteliJIDEA的工具栏中依次单击File、Setings选项打开Setings对话框,在该对 话框的左侧单击Plugins选项进行搜索Scala相关插件的操作。 第3章热门品类Top10分析65 图3- 2 WelcometoInteliJIDEA 界面 图3- 3 搜索Scala相关插件 (3)在图3-3所示界面中,找到名为Scala的插件,单击其右方的Instal 按钮安装 Scala插件。Scala插件安装完成的效果如图3-4所示。 在图3-4所示界面中,单击RestartIDE按钮,打开InteliJIDEAandPluginUpdates 66 Spark项目实战(第2版) 图3- 4 Scala插件安装完成的效果 对话框。在该对话框中单击RestartIDE按钮重启InteliJIDEA使Scala插件生效。 至此完成了在InteliJIDEA中安装Scala插件的操作。 需要注意的是,搜索Scala相关插件的操作需要确保本地计算机处于联网状态。如 果读者在进行搜索Scala相关插件的操作时,网络连接正常,但无法显示搜索结果,那么 可以在图3-4中单击 按钮,在弹出的菜单中选择HTTPProxySetings选项,打开 HTTPProxy对话框,在该对话框中进行相关配置。HTTPProxy对话框配置完成的效 果如图3-5所示。 在图3-5所示界面中,单击OK按钮后,重新打开InteliJIDEA,再次尝试搜索Scala 相关插件的操作。 2.创建项目 ( 在InteliJIDEA中基于Maven创建项目SparkProject,具体操作步骤如下。 1)在InteliJIDEA的WelcometoInteliJIDEA界面中,单击NewProject按钮, 打开NewProject对话框,在该对话框中配置项目的基本信息,具体内容如下。 ①在Name输入框中指定项目名称为SparkProject。 ②在Location输入框中指定项目的存储路径为D:\develop。 ③在JDK下拉框中选择使用的JDK为本地安装的JDK 。 ④在Aceypvn项目的模板为og.pcemaeaceyps: rhte下拉框中选择Maeraah.vn.rhtemaven-archetype-quickstart。 NewProject对话框配置完成的效果如图3-6所示。 第3章热门品类Top10 分析67 图3- 5 HTTPProxy对话框配置完成的效果 图3- 6 NewProject对话框配置完成的效果 68 Spark项目实战(第2版) 需要说明的是,根据InteliJIDEA版本的不同,NewProject对话框显示的内容会存 在差异。读者在创建项目时,需要根据实际显示的内容来配置项目的基本信息。 (2)在图3-6所示界面中,单击Create按钮创建项目SparkProject。项目SparkProject 创建完成的效果如图3-7所示。 图3- 7 项目SparkProject创建完成的效果 (3)在SparkProject项目的src/main目录中新建一个名为scala的文件夹,该文件 夹将用于存放与本项目相关的Scala源代码文件,操作步骤如下。 ①选中并右击main文件夹,在弹出的菜单中依次单击New、Directory选项打开 NewDirectory对话框。在该对话框的输入框中输入scala,如图3-8所示。 图3- 8 NewDirectory对话框 ②在图3-8所示界面中按Enter键创建scala文件夹,如图3-9所示。 图3- 9 创建scala文件夹 (4)新建的scala文件夹需要被标记为SourcesRoot(源代码根目录)才可以存放 Scala源代码文件。在图3-9所示界面中,选中并右击scala文件夹,在弹出的菜单依次单 击MarkDirectoryas、SourcesRoot选项。成功标记为SourcesRoot后,scala文件夹的 颜色将变为蓝色。 第3章 热门品类Top10分析 69 (5)由于项目SparkProject是基于Maven创建的,默认并不提供对Scala语言的支 持,所以需要为项目SparkProject添加ScalaSDK,操作步骤如下。 ① 在IntelliJIDEA 的工具栏中依次单击File、ProjectStructure 选项,会弹出 ProjectStructure对话框,单击该对话框内左侧的Libraries选项,如图3-10所示。 图3-10 ProjectStructure对话框 ② 在图3-10所示界面中,单击上方的按钮并在弹出的菜单栏中选择ScalaSDK 选项,会弹出SelectJAR'sforthenewScalaSDK 对话框,在该对话框中选择Location为 System的一项,表示通过选择本地操作系统中安装的Scala添加ScalaSDK,如图3-11所示。 图3-11 SelectJARs' forthenewScalaSDK 对话框 需要说明的是,若图3-11中未显示本地操作系统安装的Scala,则可以单击Browse 按钮通过浏览本地文件系统中Scala的安装目录添加ScalaSDK。 ③ 在图3-11所示界面中,单击OK按钮,会弹出ChooseModules对话框,如图3-12 所示。 图3-12 ChooseModules对话框 Spark项目实战(第70 2版) 在图3-12中,选择项目SparkProject后单击OK按钮返回ProjectStructure对话框。 在该对话框中单击Apply按钮后单击OK按钮关闭ProjectStructure对话框。 3.导入依赖和插件 在项目SparkProject的配置文件pom.xml中,添加用于实现本需求所需的依赖和插 件。依赖添加完成的效果如文件3-1所示。 文件3-1 pom.xml 1 5 4.0.0 6 cn.itcast 7 SparkProject 8 1.0-SNAPSHOT 9 jar 10 SparkProject 11 http://maven.apache.org 12 13 UTF-8 14 15 16 17 junit 18 junit 19 3.8.1 20 test 21 22 23 org.apache.spark 24 spark-core_2.12 25 3.3.0 26 27 28 org.json 29 json 30 20230227 31 32 33 org.apache.hbase 34 hbase-shaded-client 35 2.4.9 36 第3章 热门品类Top10分析 71 37 38 org.apache.hadoop 39 hadoop-common 40 3.3.0 41 42 43 44 45 46 net.alchim31.maven 47 scala-maven-plugin 48 3.2.2 49 50 51 52 compile 53 54 55 56 57 58 org.apache.maven.plugins 59 maven-assembly-plugin 60 3.1.0 61 62 63 jar-with-dependencies 64 65 66 67 68 make-assembly 69 package 70 71 single 72 73 74 75 76 77 78 Spark项目实战(第72 2版) 在文件3-1中,第22~41行代码用于添加实现本需求所需的依赖。其中,第22~26 行代码添加的依赖为Spark核心依赖,第27~31行代码添加的依赖为JSON依赖,第32~ 36行代码添加的依赖为HBase客户端依赖,第37~41行代码添加的依赖为Hadoop核 心依赖。 第43~77行代码用于添加实现本需求所需的插件。其中,第45~56行代码添加的 scala-maven-plugin插件用于支持Scala编译和构建Scala项目,第57~75行代码添加的 maven-assembly-plugin插件用于将项目的所有依赖和资源打包成一个独立的可执行的 jar文件。 依赖添加完成后,确认添加的依赖是否存在于项目SparkProject中,在IntelliJIDEA 主界面的右侧单击Maven选项卡展开Maven面板,在Maven面板双击Dependencies折 叠项,如图3-13所示。 图3-13 Maven面板 从图3-13中可以看出,依赖已经成功添加到项目SparkProject中。如果这里未显示 添加的依赖,则可以在图3-13中单击按钮重新加载pom.xml文件。 3.3.2 实现Spark程序 在项目SparkProject的src/main/scala目录下,新建了一个名为cn.itcast.top10的 包。在cn.itcast.top10包中创建一个名为CategoryTop10的Scala单例对象,在该单例对 象中实现热门品类Top10分析的Spark程序,具体实现过程如下。 (1)在单例对象CategoryTop10中添加main()方法,用于定义Spark程序的实现逻 辑,具体代码如文件3-2所示。 文件3-2 CategoryTop10.scala 1 package cn.itcast.top10 2 object CategoryTop10 { 3 def main(args: Array[String]): Unit ={ 4 //实现逻辑 5 } 6 } (2)在Spark程序中创建SparkConf对象conf,用于配置Spark程序的参数。在单 第3章 热门品类Top10分析 73 例对象CategoryTop10的main()方法中添加如下代码。 val conf =new SparkConf().setAppName("CategoryTop10") 上述代码指定Spark程序的名称为CategoryTop10。 (3)在Spark 程序中基于SparkConf对象创建SparkContext对象sc,用于管理 Spark程序的执行。在单例对象CategoryTop10的main()方法中添加如下代码。 val sc =new SparkContext(conf) (4)在Spark程序中,通过SparkContext对象sc的textFile()方法从文件系统中读 取用户行为数据,并将其存储到RDD对象textFileRDD中。在单例对象CategoryTop10 的main()方法中添加如下代码。 val textFileRDD =sc.textFile(args(0)) 上述代码中,使用args(0)来代替用户行为数据的具体路径,以便于将Spark程序提 交到YARN 集群运行时,可以更加灵活地通过spark-submit命令的参数来指定用户行 为数据的具体路径。 (5)在Spark程序中,通过map算子对RDD对象textFileRDD进行转换操作,并将 转换操作的结果存储到RDD对象transformRDD。在单例对象CategoryTop10的main()方 法中添加如下代码。 1 val transformRDD =textFileRDD.map(s =>{ 2 //将读取的用户行为数据转换为JSON 对象json 3 val json =new JSONObject(s) 4 val category_id =json.getString("category_id") 5 val event_type =json.getString("event_type") 6 ((category_id, event_type), 1) 7 }) 上述代码用于从用户行为数据中提取用户行为类型和品类唯一标识,并将其映射为 包含两个元素的元组,该元组的第一个元素包含用户行为类型和品类唯一标识,第二个元 素为1。 (6)在Spark程序中,通过reduceByKey算子对RDD对象transformRDD进行聚合 操作,并将聚合操作的结果存储到RDD 对象aggregationRDD。在单例对象 CategoryTop10的main()方法中添加如下代码。 val aggregationRDD =transformRDD.reduceByKey(_ +_).cache() 上述代码中的聚合操作用于统计不同品类的商品被查看、加入购物车和购买的次数。 由于后续需要对RDD 对象aggregationRDD 进行多次过滤操作,所以通过cache()方法 Spark项目实战(第74 2版) 将RDD对象aggregationRDD缓存到内存中。 (7)在Spark程序中,通过filter算子对RDD 对象aggregationRDD 进行过滤操作, 获取不同品类中商品被查看的次数,然后通过map算子对过滤操作的结果进行转换操 作,并将转换操作的结果存储到RDD 对象getViewCategoryRDD。在单例对象 CategoryTop10的main()方法中添加如下代码。 1 val getViewCategoryRDD =aggregationRDD.filter( 2 action =>action._1._2 =="view" 3 ).map(action =>(action._1._1, action._2)) 上述代码中的转换操作用于将过滤结果映射为包含两个元素的元组,该元组的第一 个元素为品类唯一标识,第二个元素为当前品类的商品被查看的次数。 (8)在Spark程序中,通过filter算子对RDD 对象aggregationRDD 进行过滤操作, 获取不同品类商品被加入购物车的次数,然后通过map算子对过滤操作的结果进行转换 操作,并将转换操作的结果存储到RDD 对象getCartCategoryRDD。在单例对象 CategoryTop10的main()方法中添加如下代码。 1 val getCartCategoryRDD =aggregationRDD.filter( 2 action =>action._1._2 =="cart" 3 ).map(action =>(action._1._1, action._2)) 上述代码中的转换操作用于将过滤结果映射为包含两个元素的元组,该元组的第一 个元素为品类唯一标识,第二个元素为当前品类的商品被加入购物车的次数。 (9)在Spark程序中,通过filter算子对RDD 对象aggregationRDD 进行过滤操作, 获取不同品类商品被购买的次数,然后通过map算子对过滤操作的结果进行转换操作, 并将转换操作的结果存储到RDD 对象getPurchaseCategoryRDD。在单例对象 CategoryTop10的main()方法中添加如下代码。 1 val getPurchaseCategoryRDD =aggregationRDD.filter( 2 action =>action._1._2 =="purchase" 3 ).map(action =>(action._1._1, action._2)) 上述代码中的转换操作用于将过滤结果映射为包含两个元素的元组,该元组的第一 个元素为品类唯一标识,第二个元素为当前品类的商品被购买的次数。 (10)在Spark程序中,通过leftOuterJoin算子对RDD对象getViewCategoryRDD、 getCartCategoryRDD和getPurchaseCategoryRDD进行合并操作,并将合并操作的最终 结果存储到RDD对象joinCategoryRDD。在单例对象CategoryTop10的main()方法中 添加如下代码。 1 val tmpJoinCategoryRDD =getViewCategoryRDD 第3章 热门品类Top10分析 75 2 .leftOuterJoin(getCartCategoryRDD) 3 val joinCategoryRDD =tmpJoinCategoryRDD 4 .leftOuterJoin(getPurchaseCategoryRDD) 上述代码中的合并操作用于将相同品类商品的不同行为类型的统计结果合并到 RDD的同一元素中,并通过这3个RDD对象合并的先后顺序,明确不同行为类型统计结 果在元素中的位置。 (11)根据热门品类Top10分析的排序规则对合并操作的结果进行排序操作,具体实 现过程如下。 ① 在项目SparkProject的包cn.itcast.top10中创建一个名为CategorySortKey的 Scala类,该类CategorySortKey需要实现Java提供的接口Comparable和Serializable, 前者用于实现对象的比较,后者用于将对象转换成字节流进行传输。此外,在类 CategorySortKey中还须要重写接口Comparable的compareTo()方法定义对象的比较 规则,具体代码如文件3-3所示。 文件3-3 CategorySortKey.scala 1 class CategorySortKey( 2 val viewCount: Int, 3 val cartCount: Int, 4 val purchaseCount: Int 5 ) extends Comparable[CategorySortKey] with Serializable { 6 override def compareTo(other: CategorySortKey): Int ={ 7 val viewComparison =Integer.compare(viewCount, other.viewCount) 8 if (viewComparison !=0) { 9 viewComparison 10 } else { 11 val cartComparison =Integer.compare(cartCount, other.cartCount) 12 if (cartComparison !=0) { 13 cartComparison 14 } else { 15 Integer.compare(purchaseCount, other.purchaseCount) 16 } 17 } 18 } 19 } 上述代码中,第6~18行代码通过重写接口Comparable的compareTo()方法定义对 象的比较规则。比较规则为,首先比较不同品类的商品被查看的次数(viewCount),若比 较结果不相等,则返回1(大于的比较关系)或-1(小于的比较关系);若比较结果相等,则 进一步比较不同品类的商品被加入购物车的次数(cartCount);若比较结果不相等,则返回1 或-1;若比较结果仍然相等,则继续比较不同品类商品被购买的次数(purchaseCount)。 ② 在Spark程序中,通过map算子,将存储在RDD对象joinCategoryRDD中的不同 Spark项目实战(第76 2版) 品类的商品被查看、加入购物车和购买的次数映射到类CategorySortKey中进行比较,并 将比较结果存储到RDD 对象transCategoryRDD 中。在单例对象CategoryTop10 的 main()方法中添加如下代码。 1 val transCategoryRDD =joinCategoryRDD.map ({ 2 case (category_id, ((viewcount, cartcountOpt), purchasecountOpt)) => 3 val cartcount =cartcountOpt.getOrElse(0).intValue() 4 val purchasecount =purchasecountOpt.getOrElse(0).intValue() 5 val sortKey =new CategorySortKey(viewcount, cartcount, purchasecount) 6 (sortKey, category_id) 7 }) 上述代码中,getOrElse()方法用于处理可能出现的空值问题,防止空指针异常的发 生。如果getOrElse()方法的返回值为None,则会将其替换为指定的默认值,这里指定的 默认值为0。 ③ 在Spark程序中,通过sortByKey算子对RDD对象transCategoryRDD进行降序 排序,并将降序排序的结果存储到RDD 对象sortedCategoryRDD 中。在单例对象 CategoryTop10的main()方法中添加如下代码。 val sortedCategoryRDD =transCategoryRDD.sortByKey(false) ④ 在Spark程序中,通过take()方法获取RDD对象sortedCategoryRDD的前10个 元素,即热门品类Top10分析的结果,并将这10个元素存储在数组top10Category中。 在单例对象CategoryTop10的main()方法中添加如下代码。 val top10Category =sortedCategoryRDD.take(10) 3.3.3 数据持久化 通过上一节内容实现的Spark程序仅仅获取了热门品类Top10的分析结果。为了 便于后续进行数据可视化,并确保分析结果的长期存储,需要进行数据持久化操作。本项 目使用HBase作为数据持久化工具。接下来,分步骤讲解如何将热门品类Top10的分析 结果存储到HBase的表中,具体操作步骤如下。 (1)为了避免实现本项目后续需求的数据持久化操作时,重复编写操作HBase的相 关代码,这里在项目SparkProject的src/main/scala目录中,新建了一个名为cn.itcast. hbase的包。在包cn.itcast.hbase中创建一个名为HBaseConnect的Scala单例对象,在 该单例对象中实现操作HBase的相关代码,具体如文件3-4所示。 文件3-4 HBaseConnect.scala 1 import org.apache.hadoop.conf.Configuration 2 import org.apache.hadoop.hbase._ 3 import org.apache.hadoop.hbase.client._ 第3章 热门品类Top10分析 77 4 import java.io.IOException 5 object HBaseConnect { 6 //创建Configuration 对象hbaseConf,用于指定HBase 的相关配置 7 val hbaseConf: Configuration =HBaseConfiguration.create() 8 //指定ZooKeeper 集群中每个ZooKeeper 服务的地址 9 hbaseConf.set("hbase.zookeeper.quorum", "spark01,spark02,spark03") 10 //指定ZooKeeper 服务的端口号 11 hbaseConf.set("hbase.zookeeper.property.clientPort", "2181") 12 var conn: Connection =_ 13 try { 14 //根据HBase 的配置信息创建HBase 连接 15 conn =ConnectionFactory.createConnection(hbaseConf) 16 } 17 catch { 18 case e: IOException =>e.printStackTrace() 19 } 20 def getHBaseAdmin: Admin ={ 21 var hbaseAdmin: Admin =null 22 try { 23 hbaseAdmin =conn.getAdmin 24 } catch { 25 case e: MasterNotRunningException =>e.printStackTrace() 26 case e: ZooKeeperConnectionException =>e.printStackTrace() 27 } 28 hbaseAdmin 29 } 30 def getConnection: Connection =conn 31 def closeConnection(): Unit ={ 32 if (conn !=null) { 33 try { 34 conn.close() 35 } catch { 36 case e: IOException =>e.printStackTrace() 37 } 38 } 39 } 40 } 上述代码中,第20~29行代码定义的getHBaseAdmin()方法用于通过HBase连接 获取Admin对象,该对象用于操作HBase的表。第30行代码定义的getConnection()方 法用于获取HBase连接。第31~39 行代码定义的closeConnection()方法用于关闭 HBase连接以释放资源。 需要注意的是,如果在运行项目SparkProject的环境中未配置虚拟机Spark01、 Spark项目实战(第78 2版) Spark02和Spark03的主机名与IP地址的映射关系,那么在配置ZooKeeper集群地址时 将主机名替换为具体的IP地址。 (2)为了避免实现本项目后续需求的数据持久化操作时,重复编写操作HBase表的 相关代码,这里在项目SparkProject的包cn.itcast.hbase中新建一个名为HBaseUtils的 Scala单例对象,在该单例对象中实现操作HBase表的相关代码,具体如文件3-5所示。 文件3-5 HBaseUtils.scala 1 import org.apache.hadoop.hbase.TableName 2 import org.apache.hadoop.hbase.client._ 3 import org.apache.hadoop.hbase.util.Bytes 4 object HBaseUtils { 5 def createtable(tableName: String, columnFamilys: String*): Unit ={ 6 //创建Admin 对象admin,用于操作HBase 的表 7 val admin: Admin =HBaseConnect.getHBaseAdmin 8 //判断表是否存在 9 if (admin.tableExists(TableName.valueOf(tableName))) { 10 //禁用表 11 admin.disableTable(TableName.valueOf(tableName)) 12 //删除表 13 admin.deleteTable(TableName.valueOf(tableName)) 14 } 15 //指定表名 16 val tableDescriptorBuilder =TableDescriptorBuilder 17 .newBuilder(TableName.valueOf(tableName)) 18 for (cf <-columnFamilys) { 19 val columnDescriptor =ColumnFamilyDescriptorBuilder 20 .newBuilder(Bytes.toBytes(cf)).build() 21 //向表中添加列族 22 tableDescriptorBuilder.setColumnFamily(columnDescriptor) 23 } 24 val tableDescriptor =tableDescriptorBuilder.build() 25 //创建表 26 admin.createTable(tableDescriptor) 27 admin.close() 28 } 29 def putsToHBase( 30 tableName: String, rowkey: String, 31 cf: String, columns: Array[String], 32 values: Array[String]): Unit ={ 33 //基于表名创建Table 对象table,用于管理表的数据 34 val table: Table =HBaseConnect.getConnection 35 .getTable(TableName.valueOf(tableName)) 第3章 热门品类Top10分析 79 36 //创建Put 对象puts,用于根据行键向表中插入数据 37 val puts: Put =new Put(Bytes.toBytes(rowkey)) 38 for (i <-columns.indices) { 39 puts.addColumn( 40 //指定列族 41 Bytes.toBytes(cf), 42 //指定列 43 Bytes.toBytes(columns(i)), 44 //指定数据 45 Bytes.toBytes(values(i)) 46 ) 47 } 48 table.put(puts) 49 table.close() 50 } 51 } 上述代码中,第5~28行代码定义了createtable()方法,用于在HBase中创建表。 该方法接收tableName和columnFamilys两个参数,其中参数tableName用于指定表 名,参数columnFamilys用于通过可变参数列表指定表中多个列族的名称。 第29~50行代码定义的putsToHBase()方法用于在向HBase的指定表插入数据, 该方法接收tableName、rowkey、cf、columns和values五个参数。其中,参数tableName 用于指定表名,参数rowkey用于指定行键,参数cf用于指定列族的名称,参数columns 用于通过数组指定表中多个列标识的名称;参数values用于通过数组指定表中多个列的 数据。 (3)在单例对象CategoryTop10中定义一个top10ToHBase()方法,该方法用于向 HBase的表top10中插入热门品类Top10的分析结果,具体代码如下。 1 def top10ToHBase(top10Category: Array[(CategorySortKey, String)]): Unit ={ 2 //在HBase 中创建表top10 并向表中添加列族top10_category 3 HBaseUtils.createtable("top10", "top10_category") 4 //创建数组column,用于指定列标识的名称 5 val column =Array( 6 "category_id", "viewcount", 7 "cartcount", "purchasecount" 8 ) 9 var viewcount ="" 10 var cartcount ="" 11 var purchasecount ="" 12 var count =0 13 for ((top10, category_id) <-top10Category) { Spark项目实战(第80 2版) 14 count +=1 15 //获取当前品类中商品被查看的次数 16 viewcount =top10.viewCount.toString 17 //获取当前品类中商品被加入购物车的次数 18 cartcount =top10.cartCount.toString 19 //获取当前品类中商品被购买的次数 20 purchasecount =top10.purchaseCount.toString 21 //创建数组value,用于指定插入的数据 22 val value =Array(category_id, viewcount, cartcount, purchasecount) 23 HBaseUtils.putsToHBase( 24 "top10", 25 s"rowkey_top$count", 26 "top10_category", 27 column, 28 value 29 ) 30 } 31 } 上述代码中,第23~29 行代码用于向HBase中表top10 的列top10_category: category_id、top10_category:viewcount、top10_category:cartcount和top10_category: purchasecount插入数据,数据的内容依次为不同品类的唯一标识、不同品类中商品被查 看的次数、不同品类中商品被加入购物车的次数和不同品类中商品被购买的次数。 (4)在单例对象CategoryTop10的main()方法中调用top10ToHBase()方法并将 top10Category作为参数传递,实现将热门品类Top10的分析结果插入HBase的表top10 中,具体代码如下。 1 try { 2 CategoryTop10.top10ToHBase(top10Category) 3 } catch { 4 case e: Exception => 5 e.printStackTrace() 6 } (5)在单例对象CategoryTop10的main()方法中添加关闭HBase连接和Spark连 接的代码,具体代码如下。 1 HBaseConnect.closeConnection() 2 sc.stop() 3.4 运行Spark程序 为了充分利用集群资源分析热门品类Top10,本项目使用spark-submit命令将 Spark程序提交到YARN 集群运行,具体操作步骤如下。 第3章热门品类Top10分析81 1.封装jar文件 在InteliJIDEA主界面的右侧单击Maven选项卡标签展开Maven面板。在Maven 面板双击Lifecycle折叠项,如图3-14所示。 图3-14 Maven面板 在图3-14所示界面中,双击package选项将项目SparkProject封装为jar文件。项 目SparkProject封装完成的效果如图3-15所示。 图3-15 项目SparkProject封装完成的效果 从图3-15中可以看出,控制台输出BUILDSUCCESS提示信息,说明成功将项目 SpkPojcakPojc-0SNAPSHOTja-ihdpednisja arret封装为jar文件Sprret1.--rwt-enece.r和 SpakPoet1.-jr。前者不仅包含项目SpakPoet的源代码和编译后 rrjc-0SNAPSHOT.arrjc 的Scala文件,还包含项目SparkProject的所有依赖;而后者仅包含项目SparkProject的 源代码和编译后的Scala文件,不含项目SparkProject的依赖。这两个jar文件存储在 D:\develop\SparkProject\target目录中。 本项目主要基于Sparrjet10SNAPSHOTja-ihdpednisjar来运行Sprk程 kPoc-.--rwt-enece.a 序。为了后续使用,这里将Sprret10-rwt-enece.r重命名为 SpakPoetjr。 akPojc-.(-) SNAPSHOTja-ihdpednisja rrjc.a 2.启动大数据集群环境 在虚拟机Spark01 、Spark02和Spark03依次启动ZooKeper集群、Hadoop集群和