学习目标
. 掌握热门品类Top10分析实现思路。
. 掌握如何创建Spark连接并读取数据集。
. 掌握利用Spark获取业务数据。
. 掌握利用Spark统计品类的行为类型。
. 掌握利用Spark过滤品类的行为类型。
. 掌握利用Spark合并相同品类的行为类型。
. 掌握利用Spark根据品类的行为类型进行排序。
. 掌握将数据持久化到HBase数据库。
. 熟悉通过SparkOnYARN 运行程序。
品类指商品所属分类,用户在访问电商网站时,通常会产生很多行为,如查看商品的信
息、将感兴趣的商品加入购物车和购买商品等,这些行为都将作为数据被网站存储。本章通
过对电商网站存储的用户行为数据进行分析,从而统计出排名前十的热门品类。
3.1 数据集分析
某电商网站2019年11月产生的用户行为数据存储在文件user_session.txt,该文件中
的每一行数据都表示一个用户行为,所有行为都与商品和用户有关。由于原始数据集较大
(13.7GB),对硬件配置要求较高,考虑到读者学习的便捷性,从原始数据集中抽取了500万
条数据(约1GB)进行分析。虽然数据比较多,但是数据内容格式基本类似,这里选取其中
一条数据进行分析,具体如下。 
{"user_session":"0000007c- adbf- 4ed7- af17- d1fef9763d67","event_type":"view","category_ 
id":"2053013553090134275","user _id":"560165420","product _id":"8900305","address _name": 
"Maryland","event_time":"2019-11-18 09:16:19"} 
上述数据包含很多字段,每个字段都代表特定的含义,具体介绍如下。
. user_session:用于标识用户行为的唯一值。
.event_type:表示用户行为的类型,包括view(查看)、cart(加入购物车)和purchase 
(购买)行为。

7
4 
Spark项目实战

.categoryid:表示商品品类ID 。
.userid: 示用户ID 。表(_) 
.prouct_id:表示商品ID 。d(_) 
.addres_name:表示产生事件的区域。
.event_time:表示产生事件的具体时间
。
注:本书的配套资源会为读者提供数据集文件usrssin.
x
e_eott。

3.实现思路分析
2 

用户在访问电商网站时,通常会针对商品产生很多行为事件,如查看、加入购物车和购
买。首先需要分别统计各个品类商品的查看次数、加入购物车次数以及购买次数。接下来, 
将同一品类中商品的查看、加入购物车以及购买次数进行合并。然后,自定义排序规则、按
照各个品类中商品的查看、加入购物车和购买次数进行降序排序,获取排名前十的品类,就
是热门品类Top10 。排序时,优先按照各个品类商品的查看次数降序排列,如果查看次数相
同,则按照各个品类商品的加入购物车次数进行降序排列。如果查看次数和加入购车次数
都相同,那么按照各品类商品的购买次数进行降序排列。最后,将同一品类中商品的查看、
加入购物车和购买次数映射到自定义排序规则中进行排序处理。有关热门品类Top10的
分析过程如图3-1所示。


图3-
1 
热门品类Top10的分析过程

针对图3-1中热门品类Top10的分析过程讲解如下。

.读取/转换:读取数据集中的行为类型(event_type)和品类ID(category_id)数据,为

第3章热门品类Top10分析75 

了便于后续聚合处理时,将相同Key的Value值进行累加,计算每个品类中不同行
为出现的总次数,这里需要对输出结果的数据格式进行转换处理,将行为类型和品
类ID作为Key,值1作为Value。

.聚合:统计各个品类的查看、加入购物车和购买次数。
.过滤/转换:将聚合结果进行过滤处理,并分为3部分数据,第一部分数据包含各个
品类查看次数,第二部分数据包含各个品类加入购物车次数,第三部分包含各个品
类购买次数。对过滤后的3部分数据进行转换处理,去除数据中的行为类型字段。
此步目的是后续合并操作时,明确同一品类中不同行为类型所处位置。
.合并:将Key值相同的Value进行合并处理,目的是将相同品类的查看次数、加入
购物车次数和购买次数合并到一行。
t)、
.排序:对每个品类中查看次数(viewcoun加入购物车次数(cartcount)和购买次数
(purchasecount)进行排序处理,在排序过程会涉及3类值的排序,因此这里需要使
用Spark的二次排序,在排序操作时使用自定义排序的方式进行处理。
3.实现热门品类Tp10
3o

实现热门品类Top10分析的程序由Java编程语言实现。目前,Java的主流开发工具
有两种:Eclipse工具和InteliJIDEA工具。我们可以在这两种开发工具中编写Java代码
来实现热门品类Top10分析。由于InteliJIDEA工具内置了很多优秀的插件,在智能代码
助手、代码自动提示、重构、CVS整合、代码分析等方面有着不错的表现,因此本项目将使用

InteliJIDEA作为Java开发工具。

3.1 
创建项目
3.
本项目使用的IJIDEA版本为2018.读者可通过IJIDEA官网下载并安装

nteli3, nteli
程序,关于InteliJIDEA的下载安装这里不做赘述(注意:安装InteliJIDEA之前需要
安
装JDK并在系统环境变量中配置JDK,本项目使用的JDK版本为1.


8)。
Maven便于维护和管理项目依赖,因此本项目将通过构建Maven现目实现相关需求。
接下来,详细讲解如何在InteliJIDEA中构建Maven项目SparkProject,具体步骤如下。

1.创建Maven项目
打开InteliJIDEA开发工具,进入InteliJIDEA欢迎界面,具体如图3-2所示。
在图3-2中单击Configure右侧的下拉箭头,依次选择ProjectDefaults→Project 

Structure命令,配置项目使用的JDK,如图3-3所示。
在图3-3中配置完JDK后,单击OK按钮返回InteliJIDEA欢迎界面。
单击图3-2中的CreateNewProject按钮创建新项目,在弹出的NewProject窗口左侧

选择Maven,即创建Maven项目,如图3-4所示。
在图3-4中,单击Next按钮,配置Maven项目的组织名(GroupId)和项目工程名
(ArtifactId),如图3-5所示。


76 
Spark项目实战


图3-
2 
InteliJIDEA 
欢迎界面


图3-
3 
配置项目使用的JDK 

在图3-5中,单击Next按钮,配置项目名称(Projectname)和项目本地的存放目录
(Projectlocation),如图3-6所示。

在图3-6中,单击Finish按钮,完成项目SparkProject的创建。项目SparkProject的初
始结构如图3-7所示。


第3章热门品类Top10 分析77 


图3-
4 
创建Maven项目


图3-
5 
配置组织名和工程名

2. 
导入依赖
本项目所需要的依赖包括JSON 、HBe和Spa在文件pom.l中添加这些依赖方
式,具体代码如文件3-1所示。
asrk, xm


Spark项78 目实战
图3-6 配置项目名称和本地存放目录
图3-7 项目SparkProject的初始结构
文件3-1 pom.xml 
1 <dependencyManagement> 
2 <dependencies> 
3 <dependency> 
4 <groupId>io.netty</groupId> 
5 <artifactId>netty-all</artifactId> 
6 <version>4.1.18.Final</version> 
7 </dependency> 
8 </dependencies> 
9 </dependencyManagement> 
10 <dependencies> 
11 <!--JSON 依赖--> 
12 <dependency> 
13 <groupId>com.alibaba</groupId>

第3章 热门品类Top10分析 79 
14 <artifactId>fastjson</artifactId> 
15 <version>1.2.62</version> 
16 </dependency> 
17 <!--HBase 依赖--> 
18 <dependency> 
19 <groupId>org.apache.hbase</groupId> 
20 <artifactId>hbase-client</artifactId> 
21 <version>1.2.1</version> 
22 </dependency> 
23 <dependency> 
24 <groupId>org.apache.hbase</groupId> 
25 <artifactId>hbase-common</artifactId> 
26 <version>1.2.1</version> 
27 </dependency> 
28 <!--Spark 依赖--> 
29 <dependency> 
30 <groupId>org.apache.spark</groupId> 
31 <artifactId>spark-core_2.11</artifactId> 
32 <version>2.3.2</version> 
33 <exclusions> 
34 <exclusion> 
35 <groupId>io.netty</groupId> 
36 <artifactId>netty</artifactId> 
37 </exclusion> 
38 </exclusions> 
39 </dependency> 
40 </dependencies> 
41 <build> 
42 <plugins> 
43 <plugin> 
44 <groupId>org.apache.maven.plugins</groupId> 
45 <artifactId>maven-compiler-plugin</artifactId> 
46 <configuration> 
47 <source>1.8</source> 
48 <target>1.8</target> 
49 </configuration> 
50 </plugin> 
51 <plugin> 
52 <artifactId>maven-assembly-plugin</artifactId> 
53 <configuration> 
54 <appendAssemblyId>false</appendAssemblyId> 
55 <descriptorRefs> 
56 <descriptorRef>jar-with-dependencies</descriptorRef> 
57 </descriptorRefs> 
58 <archive> 
59 <manifest> 
60 <!--此处指定main 方法入口的class --> 
61 <mainClass>cn.itcast.top10.CategoryTop10</mainClass> 
62 </manifest> 
63 </archive>

Spark项80 目实战 
64 </configuration> 
65 <executions> 
66 <execution> 
67 <id>make-assembly</id> 
68 <phase>package</phase> 
69 <goals> 
70 <goal>assembly</goal> 
71 </goals> 
72 </execution> 
73 </executions> 
74 </plugin> 
75 </plugins> 
76 </build> 
文件3-1中:第1~9行代码主要是对项目中Netty依赖进行多版本管理,避免本地运
行出现多个版本的Netty,导致程序出现NoSuchMethodError异常;第12~16行代码引入
JSON 依赖,用于解析JSON 数据;第18~27行代码引入HBase依赖,用于操作HBase数
据库;第29~39行代码引入Spark依赖,用于开发Spark数据分析程序;第43~50行代码
指定Maven编译的JDK版本,如果不指定,Maven3默认用JDK1.5,Maven2默认用JDK 
1.3;第51~74行代码配置程序打包方式并指定程序主类。
3.创建项目目录
在项目SparkProject中右击java目录,在弹出的快捷菜单中依次选择New→Package, 
从而新建Package包,具体如图3-8所示。
图3-8 新建Package包的步骤

第3章热门品类Top10分析81 

通过如图3-8所示的操作后,会弹出NewPackage对话框,在文本输入框Enternew 
pakgame中输入cn.tattp10设置Pakge名称,用于存放实现热门品类Top10分

caenics.oca
析的Java文件,如图3-9所示。


图3-
9 
设置Package名称

在图3-9中单击OK按钮完成Package包的创建。

4.创建程序主类
右击包cn.itattp10,在弹出的快捷菜单中依次选择New→Jva新建Jaa类,

cs.oaaCls 
v
具体如图3-10所示。


图3-10 
新建Java类

通过如图3-10所示的操作后,会弹出CreateNewClas 
对话框,在文本框Name中输
入CategoryTop10设置类名称,在类中实现热门品类Top10分析,具体如图3-11所示。

3.2 
创建Sprk连接并读取数据集
3.a
在类CategoryTop10中定义main()方法,该方法是Java程序执行的入口,在main()方
法中实现Spark程序,具体代码如文件3-2所示。


Spark项82 目实战
图3-11 设置Java类名称
文件3-2 CategoryTop10.java 
1 public class CategoryTop10{ 
2 public static void main(String[] arg){ 
3 //实现热门品类Top10 分析
4 } 
5 } 
在文件3-2的main()方法中,创建JavaSparkContext和SparkConf对象,JavaSparkContext 
对象用于实现Spark程序,SparkConf对象用于配置Spark程序相关参数,具体代码如下。 
1 SparkConf conf =new SparkConf(); 
2 //设置Application 名称为top10_category 
3 conf.setAppName("top10_category"); 
4 JavaSparkContext sc =new JavaSparkContext(conf); 
在文件3-2的main()方法中,调用JavaSparkContext对象的textFile()方法读取外部
文件,将文件中的数据加载到textFileRDD,具体代码如下。 
JavaRDD<String> textFileRDD =sc.textFile(arg[0]); 
上述代码中,通过变量arg[0]指定文件路径,目的是执行提交Spark程序到YARN 集
群运行的命令中,通过参数指定文件路径。
3.3.3 获取业务数据
在文件3-2的main()方法中,使用mapToPair()算子转换textFileRDD的每一行数据, 
用于获取每一行数据中的行为类型和品类ID 数据,将转换结果加载到transformRDD,具
体代码如下。 
1 JavaPairRDD<Tuple2<String,String>,Integer> transformRDD =textFileRDD 
2 .mapToPair( 
3 new PairFunction< 
4 String, 
5 Tuple2<String, String>, Integer>() { 
6 @Override 
7 public Tuple2<Tuple2<String, String>, Integer> call(String s) 
8 throws Exception {

第3章 热门品类Top10分析 83 
9 //将数据转换为JSON 对象
10 JSONObject json =JSONObject.parseObject(s); 
11 String category_id =json.getString("category_id"); 
12 String event_type =json.getString("event_type"); 
13 return new Tuple2<>( 
14 new Tuple2<>(category_id,event_type), 
15 new Integer(1)); 
16 } 
17 }); 
上述代码中,首先将textFileRDD中的每一行数据转换为JSON 对象;然后获取JSON 
对象中的category_id(品类ID)和event_type(行为类型);最后将category_id、event_type 
和值1添加到Tuple2对象中。
3.3.4 统计品类的行为类型
在文件3-2的main()方法中,使用reduceByKey()算子对transformRDD 进行聚合操
作,用于统计每个品类中商品被查看、加入购物车和购买的次数,将统计结果加载到
aggregationRDD,具体代码如下。 
1 JavaPairRDD<Tuple2<String, String>, Integer> aggregationRDD = 
2 transformRDD.reduceByKey( 
3 new Function2<Integer, Integer, Integer>() { 
4 @Override 
5 public Integer call(Integer integer1, Integer integer2) 
6 throws Exception { 
7 return integer1 +integer2; 
8 } 
9 }); 
3.3.5 过滤品类的行为类型
在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据
中行为类型为加入购物车和购买的数据,只保留行为类型为查看的数据,然后使用
mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被查看次数和品类ID 数
据,最终将转换结果加载到getViewCategoryRDD,具体代码如下。 
1 JavaPairRDD<String,Integer> getViewCategoryRDD =aggregationRDD 
2 .filter(new Function<Tuple2<Tuple2<String, String>, Integer> 
3 , Boolean>() { 
4 @Override 
5 public Boolean call(Tuple2<Tuple2<String, String> 
6 , Integer> tuple2) throws Exception { 
7 //获取行为类型
8 String action =tuple2._1._2; 
9 return action.equals("view"); 
10 }

Spark项84 目实战 
11 }).mapToPair( 
12 new PairFunction<Tuple2<Tuple2<String, String> 
13 , Integer>, String, Integer>() { 
14 @Override 
15 public Tuple2<String, Integer> 
16 call(Tuple2<Tuple2<String, String>, Integer> tuple2) 
17 throws Exception { 
18 return new Tuple2<>(tuple2._1._1,tuple2._2); 
19 } 
20 }); 
上述代码中,第9行通过equals()方法判断获取的行为类型是否为view(查看)并将判
断结果作为返回值,若返回值为True,则进行后续转换操作。
在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据
中行为类型为查看和购买的数据,只保留行为类型为加入购物车的数据,然后使用
mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被加入购物车的次数和品
类ID数据,最终将转换结果加载到getCartCategoryRDD,具体代码如下。 
1 JavaPairRDD<String,Integer> getCartCategoryRDD =aggregationRDD 
2 .filter(new Function<Tuple2<Tuple2<String, String>, Integer> 
3 , Boolean>() { 
4 @Override 
5 public Boolean call(Tuple2<Tuple2<String, String> 
6 , Integer> tuple2) throws Exception { 
7 String action =tuple2._1._2; 
8 return action.equals("cart"); 
9 } 
10 }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String> 
11 , Integer>, String, Integer>() { 
12 @Override 
13 public Tuple2<String, Integer> 
14 call(Tuple2<Tuple2<String, String>, Integer> tuple2) 
15 throws Exception { 
16 return new Tuple2<>(tuple2._1._1,tuple2._2); 
17 } 
18 }); 
上述代码中,第8行通过equals()方法判断获取的行为类型是否为cart(加入购物车) 
并将判断结果作为返回值,若返回值为True,则进行后续转换操作。
在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据
中行为类型为查看和加入购物车的数据,只保留行为类型为购买的数据,然后使用
mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被购买次数和品类ID 数
据,最终将转换结果加载到getPurchaseCategoryRDD,具体代码如下。

第3章 热门品类Top10分析 85 
1 JavaPairRDD<String,Integer> getPurchaseCategoryRDD =aggregationRDD 
2 .filter(new Function<Tuple2<Tuple2<String, String>, Integer> 
3 , Boolean>() { 
4 @Override 
5 public Boolean call(Tuple2<Tuple2<String, String> 
6 , Integer> tuple2) throws Exception { 
7 String action =tuple2._1._2; 
8 return action.equals("purchase"); 
9 } 
10 }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String> 
11 , Integer>, String, Integer>() { 
12 @Override 
13 public Tuple2<String, Integer> 
14 call(Tuple2<Tuple2<String, String>, Integer> tuple2) 
15 throws Exception { 
16 return new Tuple2<>(tuple2._1._1,tuple2._2); 
17 } 
18 }); 
上述代码中,第8行通过equals()方法判断获取的行为类型是否为purchase(购买)并
将判断结果作为返回值,若返回值为True,则进行后续转换操作。
3.3.6 合并相同品类的行为类型
在文件3-2 的main()方法中,使用leftOuterJoin()(左外连接)算子合并
getViewCategoryRDD、getCartCategoryRDD 和getPurchaseCategoryRDD,用于合并同一
品类的查看次数、加入购物车次数和购买次数,将合并结果加载到joinCategoryRDD,具体
代码如下。 
1 JavaPairRDD<String,Tuple2<Integer, Optional<Integer>>> 
2 tmpJoinCategoryRDD =getViewCategoryRDD 
3 .leftOuterJoin(getCartCategoryRDD); 
4 JavaPairRDD<String, 
5 Tuple2<Tuple2<Integer, Optional<Integer>>, 
6 Optional<Integer>>> joinCategoryRDD = 
7 tmpJoinCategoryRDD.leftOuterJoin(getPurchaseCategoryRDD); 
上述代码中,首先通过leftOuterJoin()算子合并getViewCategoryRDD和getCartCategoryRDD,
将合并结果加载到tmpJoinCategoryRDD,然后通过leftOuterJoin()算子合并
tmpJoinCategoryRDD和getPurchaseCategoryRDD,将合并结果加载到joinCategoryRDD。
Optional类是一个包含有可选值的包装类,它既可以含有对象也可以为空,主要为了解
决空指针异常的问题,因为某些品类中的商品可能被查看但并未被购买或加入购物车。
3.3.7 根据品类的行为类型进行排序
在包cn.itcast.top10中创建文件CategorySortKey.java,用于实现自定义排序。在类
CategorySortKey中继承比较器接口Comparable 和序列化接口Serializable,并实现

Spark项86 目实战
Comparable接口的compareTo()方法,具体代码如文件3-3所示。
文件3-3 CategorySortKey.java 
1 import java.io.Serializable; 
2 public class CategorySortKey implements Comparable<CategorySortKey> 
3 ,Serializable{ 
4 //查看次数
5 private int viewCount; 
6 //加入购物车次数
7 private int cartCount; 
8 //购买次数
9 private int purchaseCount; 
10 //定义类的构造方法
11 public CategorySortKey( 
12 int viewcount, 
13 int cartCount, 
14 int purchaseCount) 
15 { 
16 this.viewCount =viewcount; 
17 this.cartCount =cartCount; 
18 this.purchaseCount =purchaseCount; 
19 } 
20 //定义属性的getter 和setter 方法
21 . 
22 @Override 
23 public int compareTo(CategorySortKey other) { 
24 if(viewCount -other.getViewCount() !=0) { 
25 return (int) (viewCount -other.getViewCount()); 
26 } else if(cartCount -other.getCartCount() !=0) { 
27 return (int) (cartCount -other.getCartCount()); 
28 } else if(purchaseCount -other.getPurchaseCount() !=0) { 
29 return (int) (purchaseCount -other.getPurchaseCount()); 
30 } 
31 return 0; 
32 } 
33 } 
在文件3-3中,第22~32行代码,重写接口Comparable的compareTo()方法,在方法
内部实现对象的比较,比较的规则为返回值等于0表示相等;返回值小于0表示小于;返回
值大于0表示大于。比较的优先级按照viewCount、cartCount和purchaseCount的顺序。
在文件3-2 的main()方法中,使用mapTopair()算子转换joinCategoryRDD,将
joinCategoryRDD中品类被查看次数、加入购物车次数和购买次数映射到自定义排序类
CategorySortKey,通过transCategoryRDD加载转换结果,具体代码如下。 
1 JavaPairRDD<CategorySortKey,String> transCategoryRDD =joinCategoryRDD 
2 .mapToPair(new PairFunction<Tuple2<String, 
3 Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>>,

第3章 热门品类Top10分析 87 
4 CategorySortKey,String>() { 
5 @Override 
6 public Tuple2<CategorySortKey,String> call(Tuple2<String, 
7 Tuple2<Tuple2<Integer, Optional<Integer>>, 
8 Optional<Integer>>> tuple2) throws Exception { 
9 String category_id =tuple2._1; 
10 int viewcount =tuple2._2._1._1; 
11 int cartcount =0; 
12 int purchasecount =0; 
13 //判断品类被加入购物车次数是否为空
14 if (tuple2._2._1._2.isPresent()){ 
15 cartcount =tuple2._2._1._2.get().intValue(); 
16 } 
17 //判断品类被购买次数是否为空
18 if (tuple2._2._2.isPresent()){ 
19 purchasecount =tuple2._2._2.get().intValue(); 
20 } 
21 /*将viewcount、cartcount 和purchasecount 映射到
22 类CategorySortKey 的构造方法中*/ 
23 CategorySortKey sortKey = 
24 new CategorySortKey(viewcount, cartcount, purchasecount); 
25 return new Tuple2<>(sortKey,category_id); 
26 } 
27 }); 
上述代码中的isPresent()方法用于判断Optional类型的数据是否为空,若值为空则通
过get()方法获取值,并通过intValue()方法指定获取的值为Int类型。
在文件3-2的main()方法中,通过sortByKey()算子对transCategoryRDD进行排序操
作,使transCategoryRDD中品类被查看次数、加入购物车次数和购买次数根据自定义排序
类CategorySortKey指定的排序规则进行排序,将排序结果加载到sortedCategoryRDD,具
体代码如下。 
JavaPairRDD<CategorySortKey,String> sortedCategoryRDD = 
transCategoryRDD.sortByKey(false); 
上述代码中,sortByKey()算子的参数为false,表示使用自定义排序类的比较方式进行
排序。在
文件3-2的main()方法中,使用take()算子获取sortedCategoryRDD前10个元素, 
即热门品类Top10分析结果,将分析结果加载到top10CategoryList,具体代码如下。 
List<Tuple2<CategorySortKey, String>> top10CategoryList = 
sortedCategoryRDD.take(10); 
上述代码中,take()算子的参数为10,表示获取sortedCategoryRDD前10个元素。
3.3.8 数据持久化
本项目使用HBase数据库作为数据持久化工具,HBase分布式数据库通过HDFS和

Spark项88 目实战
ZooKeeper实现数据的高可用和冗余,从而确保数据库和数据的安全性。接下来,分步骤讲
解如何将热门品类Top10分析结果持久化到HBase数据库中。
1.封装HBase工具类
为了避免后续环节重复编写数据库连接和数据库操作的相关代码,这里将HBase数据
库连接工具类和HBase数据库操作工具类进行封装,具体实现步骤如下。
(1)在项目SparkProject的java目录新建Package包cn.itcast.hbase,用于存放实现数
据持久化的Java文件。在包cn.itcast.hbase下创建文件HbaseConnect.java,用于实现封装
HBase数据库连接工具类,具体代码如文件3-4所示。
文件3-4 HbaseConnect.java 
1 import org.apache.hadoop.conf.Configuration; 
2 import org.apache.hadoop.hbase.HBaseConfiguration; 
3 import org.apache.hadoop.hbase.MasterNotRunningException; 
4 import org.apache.hadoop.hbase.ZooKeeperConnectionException; 
5 import org.apache.hadoop.hbase.client.Connection; 
6 import org.apache.hadoop.hbase.client.ConnectionFactory; 
7 import org.apache.hadoop.hbase.client.HBaseAdmin; 
8 import java.io.IOException; 
9 public class HbaseConnect { 
10 public static Configuration conf; 
11 public static Connection conn; 
12 public static HBaseAdmin hbaseAdmin; 
13 static { 
14 //创建HBase 配置信息
15 conf =HBaseConfiguration.create(); 
16 //配置ZooKeeper 集群地址
17 conf.set("hbase.zookeeper.quorum", "spark01,spark02,spark03"); 
18 //配置ZooKeeper 端口号
19 conf.set("hbase.zookeeper.property.clientPort", "2181"); 
20 try { 
21 //通过HBase 配置获取HBase 数据库连接对象
22 conn =ConnectionFactory.createConnection(conf); 
23 } catch (IOException e) { 
24 e.printStackTrace(); 
25 } 
26 } 
27 public static HBaseAdmin getHBaseAdmin() throws IOException{ 
28 try { 
29 //获取HBase 数据库操作对象
30 hbaseAdmin =(HBaseAdmin)(conn.getAdmin()); 
31 } catch (MasterNotRunningException e) { 
32 e.printStackTrace(); 
33 } catch (ZooKeeperConnectionException e) { 
34 e.printStackTrace(); 
35 } 
36 return hbaseAdmin;

第3章 热门品类Top10分析 89 
37 } 
38 public static Connection getConnection(){ 
39 return conn; 
40 } 
41 public static synchronized void closeConnection(){ 
42 if(conn!=null){ 
43 try { 
44 conn.close(); 
45 } catch (IOException e) { 
46 e.printStackTrace(); 
47 } 
48 } 
49 } 
50 } 
在文件3-3中,第38~40行代码创建返回值类型为Connection的方法getConnection(), 
用于获取HBase数据库连接;第41~49行代码创建方法closeConnection(),用于关闭
HBase数据库连接。
需要注意的是,若运行项目SparkProject的环境中未配置IP 映射,则需要在配置
Zookeeper集群地址时使用IP地址而不是主机名。
(2)在项目SparkProject的包cn.itcast.hbase下创建文件HbaseUtils.java,用于实现
封装HBase数据库操作工具类,具体代码如文件3-5所示。
文件3-5 HbaseUtils.java 
1 import org.apache.hadoop.hbase.HColumnDescriptor; 
2 import org.apache.hadoop.hbase.HTableDescriptor; 
3 import org.apache.hadoop.hbase.TableName; 
4 import org.apache.hadoop.hbase.client.HBaseAdmin; 
5 import org.apache.hadoop.hbase.client.Put; 
6 import org.apache.hadoop.hbase.client.Table; 
7 import org.apache.hadoop.hbase.util.Bytes; 
8 import java.io.IOException; 
9 public class HbaseUtils { 
10 public static void createTable(String tableName, 
11 String... columFamilys) 
12 throws IOException { 
13 //获取HBase 数据表操作对象
14 HBaseAdmin admin =HbaseConnect.getHBaseAdmin(); 
15 //判断表是否存在
16 if (admin.tableExists(tableName)){ 
17 //关闭表
18 admin.disableTable(tableName); 
19 //删除表
20 admin.deleteTable(tableName); 
21 } 
22 //HTableDescriptor 类包含了表的名字以及表的列族信息

Spark项90 目实战 
23 HTableDescriptor hd 
24 =new HTableDescriptor(TableName.valueOf(tableName)); 
25 for (String cf : columFamilys) { 
26 hd.addFamily(new HColumnDescriptor(cf)); 
27 } 
28 //通过createTable()方法创建HBase 数据表
29 admin.createTable(hd); 
30 admin.close(); 
31 } 
32 public static void putsToHBase(String tableName, 
33 String rowkey, 
34 String cf, 
35 String[] column, 
36 String[] value) 
37 throws Exception { 
38 //获取指定HBase 数据表的操作对象
39 Table table =HbaseConnect 
40 .getConnection() 
41 .getTable(TableName.valueOf(tableName)); 
42 //通过Put 对象存储插入数据表的内容
43 Put puts =new Put(rowkey.getBytes()); 
44 for (int i =0;i<column.length;i++){ 
45 puts.addColumn( 
46 Bytes.toBytes(cf), 
47 Bytes.toBytes(column[i]), 
48 Bytes.toBytes(value[i])); 
49 } 
50 //向指定数据表中插入数据
51 table.put(puts); 
52 table.close(); 
53 } 
54 } 
在文件3-5中,第10~31行代码定义方法createTable(),用于创建HBase数据表。方
法createTable()包含参数tableName和columFamilys,其中参数tableName表示数据表名
称,参数columFamilys表示列族;第32~52行代码定义方法putsToHBase(),用于向指定
HBase数据表中插入数据。方法putsToHBase()包含参数tableName、rowkey、cf、column 
和value,其中参数tableName表示数据表名称,参数rowkey表示行键,参数cf表示列族, 
参数column表示行,参数value表示值。
2.持久化热门品类Top10分析结果
在文件3-2的类CategoryTop10中添加方法top10ToHbase(),用于将热门品类Top10 
分析结果持久化到HBase数据库中,该方法包含参数top10CategoryList,表示热门品类
Top10分析结果数据,具体代码如下。

第3章 热门品类Top10分析 91 
1 public static void top10ToHbase(List<Tuple2<CategorySortKey, String>> 
2 top10CategoryList) throws Exception 
3 { 
4 //创建数据表top10 和列族top10_category 
5 HbaseUtils.createTable("top10","top10_category"); 
6 //创建数组column,用于存储数据表top10 的列名
7 String[] column = 
8 {"category_id","viewcount","cartcount","purchasecount"}; 
9 String viewcount =""; 
10 String cartcount =""; 
11 String purchasecount =""; 
12 String category_id =""; 
13 int count =0; 
14 //遍历集合top10CategoryList 
15 for (Tuple2<CategorySortKey, String> top10: top10CategoryList) { 
16 count++; 
17 //获取查看次数
18 viewcount =String.valueOf(top10._1.getViewCount()); 
19 //获取加入购物车次数
20 cartcount =String.valueOf(top10._1.getCartCount()); 
21 //获取购买次数
22 purchasecount =String.valueOf(top10._1.getPurchaseCount()); 
23 //获取品类ID 
24 category_id =top10._2; 
25 //创建数组value,用于存储数据表top10 的值
26 String[] value = 
27 {category_id,viewcount,cartcount,purchasecount}; 
28 HbaseUtils.putsToHBase("top10", 
29 "rowkey_top"+count, 
30 "top10_category", 
31 column, 
32 value); 
33 } 
34 } 
上述代码中,第28~32行,调用HBase数据库操作工具类的putToHBase()方法,用于
持久化热门品类Top10数据。putToHBase()方法包含5个参数:其中第1个参数为字符
串top10,表示数据表名称;第2个参数为字符串对象count和字符串rowkey_top,表示数
据表的行键;第3个参数为字符串top10_category,表示数据表的列族;第4个参数为数组
column,数组中的每一个元素表示数据表的列名;第5个参数为数组value,数组中的每一个
元素表示数据表的值。
在文件3-2的main()方法中,调用方法top10ToHbase()并传入参数top10CategoryList, 
用于在Spark程序中实现top10ToHbase()方法,将热门品类Top10 分析结果持久化到
HBase数据库中的数据表top10,具体代码如下。

Spark项92 目实战 
1 //通过try…catch 抛出异常
2 try { 
3 top10ToHbase(top10CategoryList); 
4 } catch (Exception e) { 
5 e.printStackTrace(); 
6 } 
7 //关闭HBase 数据库连接
8 HbaseConnect.closeConnection(); 
9 //关闭JavaSparkContext 连接
10 sc.close(); 
3.4 运行程序
热门品类Top10分析程序编写完成后,需要在IntelliJIDEA 中将程序封装成jar包,并
上传到集群环境中,通过spark-submit将程序提交到YARN 中运行,具体步骤如下。
1.封装jar包
在IntelliJIDEA 主界面单击右侧Maven选项卡打开Maven窗口,如图3-12和图3-13 
所示。
图3-12 Maven选项卡
在Maven窗口单击,展开Lifecycle目录,如图3-14所示。
双击Lifecycle目录中的package选项,IntelliJIDEA 会自动将程序封装成jar包,封装
完成后,若出现BUILDSUCCESS内容,则证明成功封装热门品类Top10分析程序为jar