学习目标 . 掌握热门品类Top10分析实现思路。 . 掌握如何创建Spark连接并读取数据集。 . 掌握利用Spark获取业务数据。 . 掌握利用Spark统计品类的行为类型。 . 掌握利用Spark过滤品类的行为类型。 . 掌握利用Spark合并相同品类的行为类型。 . 掌握利用Spark根据品类的行为类型进行排序。 . 掌握将数据持久化到HBase数据库。 . 熟悉通过SparkOnYARN 运行程序。 品类指商品所属分类,用户在访问电商网站时,通常会产生很多行为,如查看商品的信 息、将感兴趣的商品加入购物车和购买商品等,这些行为都将作为数据被网站存储。本章通 过对电商网站存储的用户行为数据进行分析,从而统计出排名前十的热门品类。 3.1 数据集分析 某电商网站2019年11月产生的用户行为数据存储在文件user_session.txt,该文件中 的每一行数据都表示一个用户行为,所有行为都与商品和用户有关。由于原始数据集较大 (13.7GB),对硬件配置要求较高,考虑到读者学习的便捷性,从原始数据集中抽取了500万 条数据(约1GB)进行分析。虽然数据比较多,但是数据内容格式基本类似,这里选取其中 一条数据进行分析,具体如下。 {"user_session":"0000007c- adbf- 4ed7- af17- d1fef9763d67","event_type":"view","category_ id":"2053013553090134275","user _id":"560165420","product _id":"8900305","address _name": "Maryland","event_time":"2019-11-18 09:16:19"} 上述数据包含很多字段,每个字段都代表特定的含义,具体介绍如下。 . user_session:用于标识用户行为的唯一值。 .event_type:表示用户行为的类型,包括view(查看)、cart(加入购物车)和purchase (购买)行为。 7 4 Spark项目实战 .categoryid:表示商品品类ID 。 .userid: 示用户ID 。表(_) .prouct_id:表示商品ID 。d(_) .addres_name:表示产生事件的区域。 .event_time:表示产生事件的具体时间 。 注:本书的配套资源会为读者提供数据集文件usrssin. x e_eott。 3.实现思路分析 2 用户在访问电商网站时,通常会针对商品产生很多行为事件,如查看、加入购物车和购 买。首先需要分别统计各个品类商品的查看次数、加入购物车次数以及购买次数。接下来, 将同一品类中商品的查看、加入购物车以及购买次数进行合并。然后,自定义排序规则、按 照各个品类中商品的查看、加入购物车和购买次数进行降序排序,获取排名前十的品类,就 是热门品类Top10 。排序时,优先按照各个品类商品的查看次数降序排列,如果查看次数相 同,则按照各个品类商品的加入购物车次数进行降序排列。如果查看次数和加入购车次数 都相同,那么按照各品类商品的购买次数进行降序排列。最后,将同一品类中商品的查看、 加入购物车和购买次数映射到自定义排序规则中进行排序处理。有关热门品类Top10的 分析过程如图3-1所示。 图3- 1 热门品类Top10的分析过程 针对图3-1中热门品类Top10的分析过程讲解如下。 .读取/转换:读取数据集中的行为类型(event_type)和品类ID(category_id)数据,为 第3章热门品类Top10分析75 了便于后续聚合处理时,将相同Key的Value值进行累加,计算每个品类中不同行 为出现的总次数,这里需要对输出结果的数据格式进行转换处理,将行为类型和品 类ID作为Key,值1作为Value。 .聚合:统计各个品类的查看、加入购物车和购买次数。 .过滤/转换:将聚合结果进行过滤处理,并分为3部分数据,第一部分数据包含各个 品类查看次数,第二部分数据包含各个品类加入购物车次数,第三部分包含各个品 类购买次数。对过滤后的3部分数据进行转换处理,去除数据中的行为类型字段。 此步目的是后续合并操作时,明确同一品类中不同行为类型所处位置。 .合并:将Key值相同的Value进行合并处理,目的是将相同品类的查看次数、加入 购物车次数和购买次数合并到一行。 t)、 .排序:对每个品类中查看次数(viewcoun加入购物车次数(cartcount)和购买次数 (purchasecount)进行排序处理,在排序过程会涉及3类值的排序,因此这里需要使 用Spark的二次排序,在排序操作时使用自定义排序的方式进行处理。 3.实现热门品类Tp10 3o 实现热门品类Top10分析的程序由Java编程语言实现。目前,Java的主流开发工具 有两种:Eclipse工具和InteliJIDEA工具。我们可以在这两种开发工具中编写Java代码 来实现热门品类Top10分析。由于InteliJIDEA工具内置了很多优秀的插件,在智能代码 助手、代码自动提示、重构、CVS整合、代码分析等方面有着不错的表现,因此本项目将使用 InteliJIDEA作为Java开发工具。 3.1 创建项目 3. 本项目使用的IJIDEA版本为2018.读者可通过IJIDEA官网下载并安装 nteli3, nteli 程序,关于InteliJIDEA的下载安装这里不做赘述(注意:安装InteliJIDEA之前需要 安 装JDK并在系统环境变量中配置JDK,本项目使用的JDK版本为1. 8)。 Maven便于维护和管理项目依赖,因此本项目将通过构建Maven现目实现相关需求。 接下来,详细讲解如何在InteliJIDEA中构建Maven项目SparkProject,具体步骤如下。 1.创建Maven项目 打开InteliJIDEA开发工具,进入InteliJIDEA欢迎界面,具体如图3-2所示。 在图3-2中单击Configure右侧的下拉箭头,依次选择ProjectDefaults→Project Structure命令,配置项目使用的JDK,如图3-3所示。 在图3-3中配置完JDK后,单击OK按钮返回InteliJIDEA欢迎界面。 单击图3-2中的CreateNewProject按钮创建新项目,在弹出的NewProject窗口左侧 选择Maven,即创建Maven项目,如图3-4所示。 在图3-4中,单击Next按钮,配置Maven项目的组织名(GroupId)和项目工程名 (ArtifactId),如图3-5所示。 76 Spark项目实战 图3- 2 InteliJIDEA 欢迎界面 图3- 3 配置项目使用的JDK 在图3-5中,单击Next按钮,配置项目名称(Projectname)和项目本地的存放目录 (Projectlocation),如图3-6所示。 在图3-6中,单击Finish按钮,完成项目SparkProject的创建。项目SparkProject的初 始结构如图3-7所示。 第3章热门品类Top10 分析77 图3- 4 创建Maven项目 图3- 5 配置组织名和工程名 2. 导入依赖 本项目所需要的依赖包括JSON 、HBe和Spa在文件pom.l中添加这些依赖方 式,具体代码如文件3-1所示。 asrk, xm Spark项78 目实战 图3-6 配置项目名称和本地存放目录 图3-7 项目SparkProject的初始结构 文件3-1 pom.xml 1 <dependencyManagement> 2 <dependencies> 3 <dependency> 4 <groupId>io.netty</groupId> 5 <artifactId>netty-all</artifactId> 6 <version>4.1.18.Final</version> 7 </dependency> 8 </dependencies> 9 </dependencyManagement> 10 <dependencies> 11 <!--JSON 依赖--> 12 <dependency> 13 <groupId>com.alibaba</groupId> 第3章 热门品类Top10分析 79 14 <artifactId>fastjson</artifactId> 15 <version>1.2.62</version> 16 </dependency> 17 <!--HBase 依赖--> 18 <dependency> 19 <groupId>org.apache.hbase</groupId> 20 <artifactId>hbase-client</artifactId> 21 <version>1.2.1</version> 22 </dependency> 23 <dependency> 24 <groupId>org.apache.hbase</groupId> 25 <artifactId>hbase-common</artifactId> 26 <version>1.2.1</version> 27 </dependency> 28 <!--Spark 依赖--> 29 <dependency> 30 <groupId>org.apache.spark</groupId> 31 <artifactId>spark-core_2.11</artifactId> 32 <version>2.3.2</version> 33 <exclusions> 34 <exclusion> 35 <groupId>io.netty</groupId> 36 <artifactId>netty</artifactId> 37 </exclusion> 38 </exclusions> 39 </dependency> 40 </dependencies> 41 <build> 42 <plugins> 43 <plugin> 44 <groupId>org.apache.maven.plugins</groupId> 45 <artifactId>maven-compiler-plugin</artifactId> 46 <configuration> 47 <source>1.8</source> 48 <target>1.8</target> 49 </configuration> 50 </plugin> 51 <plugin> 52 <artifactId>maven-assembly-plugin</artifactId> 53 <configuration> 54 <appendAssemblyId>false</appendAssemblyId> 55 <descriptorRefs> 56 <descriptorRef>jar-with-dependencies</descriptorRef> 57 </descriptorRefs> 58 <archive> 59 <manifest> 60 <!--此处指定main 方法入口的class --> 61 <mainClass>cn.itcast.top10.CategoryTop10</mainClass> 62 </manifest> 63 </archive> Spark项80 目实战 64 </configuration> 65 <executions> 66 <execution> 67 <id>make-assembly</id> 68 <phase>package</phase> 69 <goals> 70 <goal>assembly</goal> 71 </goals> 72 </execution> 73 </executions> 74 </plugin> 75 </plugins> 76 </build> 文件3-1中:第1~9行代码主要是对项目中Netty依赖进行多版本管理,避免本地运 行出现多个版本的Netty,导致程序出现NoSuchMethodError异常;第12~16行代码引入 JSON 依赖,用于解析JSON 数据;第18~27行代码引入HBase依赖,用于操作HBase数 据库;第29~39行代码引入Spark依赖,用于开发Spark数据分析程序;第43~50行代码 指定Maven编译的JDK版本,如果不指定,Maven3默认用JDK1.5,Maven2默认用JDK 1.3;第51~74行代码配置程序打包方式并指定程序主类。 3.创建项目目录 在项目SparkProject中右击java目录,在弹出的快捷菜单中依次选择New→Package, 从而新建Package包,具体如图3-8所示。 图3-8 新建Package包的步骤 第3章热门品类Top10分析81 通过如图3-8所示的操作后,会弹出NewPackage对话框,在文本输入框Enternew pakgame中输入cn.tattp10设置Pakge名称,用于存放实现热门品类Top10分 caenics.oca 析的Java文件,如图3-9所示。 图3- 9 设置Package名称 在图3-9中单击OK按钮完成Package包的创建。 4.创建程序主类 右击包cn.itattp10,在弹出的快捷菜单中依次选择New→Jva新建Jaa类, cs.oaaCls v 具体如图3-10所示。 图3-10 新建Java类 通过如图3-10所示的操作后,会弹出CreateNewClas 对话框,在文本框Name中输 入CategoryTop10设置类名称,在类中实现热门品类Top10分析,具体如图3-11所示。 3.2 创建Sprk连接并读取数据集 3.a 在类CategoryTop10中定义main()方法,该方法是Java程序执行的入口,在main()方 法中实现Spark程序,具体代码如文件3-2所示。 Spark项82 目实战 图3-11 设置Java类名称 文件3-2 CategoryTop10.java 1 public class CategoryTop10{ 2 public static void main(String[] arg){ 3 //实现热门品类Top10 分析 4 } 5 } 在文件3-2的main()方法中,创建JavaSparkContext和SparkConf对象,JavaSparkContext 对象用于实现Spark程序,SparkConf对象用于配置Spark程序相关参数,具体代码如下。 1 SparkConf conf =new SparkConf(); 2 //设置Application 名称为top10_category 3 conf.setAppName("top10_category"); 4 JavaSparkContext sc =new JavaSparkContext(conf); 在文件3-2的main()方法中,调用JavaSparkContext对象的textFile()方法读取外部 文件,将文件中的数据加载到textFileRDD,具体代码如下。 JavaRDD<String> textFileRDD =sc.textFile(arg[0]); 上述代码中,通过变量arg[0]指定文件路径,目的是执行提交Spark程序到YARN 集 群运行的命令中,通过参数指定文件路径。 3.3.3 获取业务数据 在文件3-2的main()方法中,使用mapToPair()算子转换textFileRDD的每一行数据, 用于获取每一行数据中的行为类型和品类ID 数据,将转换结果加载到transformRDD,具 体代码如下。 1 JavaPairRDD<Tuple2<String,String>,Integer> transformRDD =textFileRDD 2 .mapToPair( 3 new PairFunction< 4 String, 5 Tuple2<String, String>, Integer>() { 6 @Override 7 public Tuple2<Tuple2<String, String>, Integer> call(String s) 8 throws Exception { 第3章 热门品类Top10分析 83 9 //将数据转换为JSON 对象 10 JSONObject json =JSONObject.parseObject(s); 11 String category_id =json.getString("category_id"); 12 String event_type =json.getString("event_type"); 13 return new Tuple2<>( 14 new Tuple2<>(category_id,event_type), 15 new Integer(1)); 16 } 17 }); 上述代码中,首先将textFileRDD中的每一行数据转换为JSON 对象;然后获取JSON 对象中的category_id(品类ID)和event_type(行为类型);最后将category_id、event_type 和值1添加到Tuple2对象中。 3.3.4 统计品类的行为类型 在文件3-2的main()方法中,使用reduceByKey()算子对transformRDD 进行聚合操 作,用于统计每个品类中商品被查看、加入购物车和购买的次数,将统计结果加载到 aggregationRDD,具体代码如下。 1 JavaPairRDD<Tuple2<String, String>, Integer> aggregationRDD = 2 transformRDD.reduceByKey( 3 new Function2<Integer, Integer, Integer>() { 4 @Override 5 public Integer call(Integer integer1, Integer integer2) 6 throws Exception { 7 return integer1 +integer2; 8 } 9 }); 3.3.5 过滤品类的行为类型 在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据 中行为类型为加入购物车和购买的数据,只保留行为类型为查看的数据,然后使用 mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被查看次数和品类ID 数 据,最终将转换结果加载到getViewCategoryRDD,具体代码如下。 1 JavaPairRDD<String,Integer> getViewCategoryRDD =aggregationRDD 2 .filter(new Function<Tuple2<Tuple2<String, String>, Integer> 3 , Boolean>() { 4 @Override 5 public Boolean call(Tuple2<Tuple2<String, String> 6 , Integer> tuple2) throws Exception { 7 //获取行为类型 8 String action =tuple2._1._2; 9 return action.equals("view"); 10 } Spark项84 目实战 11 }).mapToPair( 12 new PairFunction<Tuple2<Tuple2<String, String> 13 , Integer>, String, Integer>() { 14 @Override 15 public Tuple2<String, Integer> 16 call(Tuple2<Tuple2<String, String>, Integer> tuple2) 17 throws Exception { 18 return new Tuple2<>(tuple2._1._1,tuple2._2); 19 } 20 }); 上述代码中,第9行通过equals()方法判断获取的行为类型是否为view(查看)并将判 断结果作为返回值,若返回值为True,则进行后续转换操作。 在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据 中行为类型为查看和购买的数据,只保留行为类型为加入购物车的数据,然后使用 mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被加入购物车的次数和品 类ID数据,最终将转换结果加载到getCartCategoryRDD,具体代码如下。 1 JavaPairRDD<String,Integer> getCartCategoryRDD =aggregationRDD 2 .filter(new Function<Tuple2<Tuple2<String, String>, Integer> 3 , Boolean>() { 4 @Override 5 public Boolean call(Tuple2<Tuple2<String, String> 6 , Integer> tuple2) throws Exception { 7 String action =tuple2._1._2; 8 return action.equals("cart"); 9 } 10 }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String> 11 , Integer>, String, Integer>() { 12 @Override 13 public Tuple2<String, Integer> 14 call(Tuple2<Tuple2<String, String>, Integer> tuple2) 15 throws Exception { 16 return new Tuple2<>(tuple2._1._1,tuple2._2); 17 } 18 }); 上述代码中,第8行通过equals()方法判断获取的行为类型是否为cart(加入购物车) 并将判断结果作为返回值,若返回值为True,则进行后续转换操作。 在文件3-2的main()方法中,首先使用filter()算子过滤aggregationRDD每一行数据 中行为类型为查看和加入购物车的数据,只保留行为类型为购买的数据,然后使用 mapToPair()算子对过滤结果进行转换,获取每一行数据中品类被购买次数和品类ID 数 据,最终将转换结果加载到getPurchaseCategoryRDD,具体代码如下。 第3章 热门品类Top10分析 85 1 JavaPairRDD<String,Integer> getPurchaseCategoryRDD =aggregationRDD 2 .filter(new Function<Tuple2<Tuple2<String, String>, Integer> 3 , Boolean>() { 4 @Override 5 public Boolean call(Tuple2<Tuple2<String, String> 6 , Integer> tuple2) throws Exception { 7 String action =tuple2._1._2; 8 return action.equals("purchase"); 9 } 10 }).mapToPair(new PairFunction<Tuple2<Tuple2<String, String> 11 , Integer>, String, Integer>() { 12 @Override 13 public Tuple2<String, Integer> 14 call(Tuple2<Tuple2<String, String>, Integer> tuple2) 15 throws Exception { 16 return new Tuple2<>(tuple2._1._1,tuple2._2); 17 } 18 }); 上述代码中,第8行通过equals()方法判断获取的行为类型是否为purchase(购买)并 将判断结果作为返回值,若返回值为True,则进行后续转换操作。 3.3.6 合并相同品类的行为类型 在文件3-2 的main()方法中,使用leftOuterJoin()(左外连接)算子合并 getViewCategoryRDD、getCartCategoryRDD 和getPurchaseCategoryRDD,用于合并同一 品类的查看次数、加入购物车次数和购买次数,将合并结果加载到joinCategoryRDD,具体 代码如下。 1 JavaPairRDD<String,Tuple2<Integer, Optional<Integer>>> 2 tmpJoinCategoryRDD =getViewCategoryRDD 3 .leftOuterJoin(getCartCategoryRDD); 4 JavaPairRDD<String, 5 Tuple2<Tuple2<Integer, Optional<Integer>>, 6 Optional<Integer>>> joinCategoryRDD = 7 tmpJoinCategoryRDD.leftOuterJoin(getPurchaseCategoryRDD); 上述代码中,首先通过leftOuterJoin()算子合并getViewCategoryRDD和getCartCategoryRDD, 将合并结果加载到tmpJoinCategoryRDD,然后通过leftOuterJoin()算子合并 tmpJoinCategoryRDD和getPurchaseCategoryRDD,将合并结果加载到joinCategoryRDD。 Optional类是一个包含有可选值的包装类,它既可以含有对象也可以为空,主要为了解 决空指针异常的问题,因为某些品类中的商品可能被查看但并未被购买或加入购物车。 3.3.7 根据品类的行为类型进行排序 在包cn.itcast.top10中创建文件CategorySortKey.java,用于实现自定义排序。在类 CategorySortKey中继承比较器接口Comparable 和序列化接口Serializable,并实现 Spark项86 目实战 Comparable接口的compareTo()方法,具体代码如文件3-3所示。 文件3-3 CategorySortKey.java 1 import java.io.Serializable; 2 public class CategorySortKey implements Comparable<CategorySortKey> 3 ,Serializable{ 4 //查看次数 5 private int viewCount; 6 //加入购物车次数 7 private int cartCount; 8 //购买次数 9 private int purchaseCount; 10 //定义类的构造方法 11 public CategorySortKey( 12 int viewcount, 13 int cartCount, 14 int purchaseCount) 15 { 16 this.viewCount =viewcount; 17 this.cartCount =cartCount; 18 this.purchaseCount =purchaseCount; 19 } 20 //定义属性的getter 和setter 方法 21 . 22 @Override 23 public int compareTo(CategorySortKey other) { 24 if(viewCount -other.getViewCount() !=0) { 25 return (int) (viewCount -other.getViewCount()); 26 } else if(cartCount -other.getCartCount() !=0) { 27 return (int) (cartCount -other.getCartCount()); 28 } else if(purchaseCount -other.getPurchaseCount() !=0) { 29 return (int) (purchaseCount -other.getPurchaseCount()); 30 } 31 return 0; 32 } 33 } 在文件3-3中,第22~32行代码,重写接口Comparable的compareTo()方法,在方法 内部实现对象的比较,比较的规则为返回值等于0表示相等;返回值小于0表示小于;返回 值大于0表示大于。比较的优先级按照viewCount、cartCount和purchaseCount的顺序。 在文件3-2 的main()方法中,使用mapTopair()算子转换joinCategoryRDD,将 joinCategoryRDD中品类被查看次数、加入购物车次数和购买次数映射到自定义排序类 CategorySortKey,通过transCategoryRDD加载转换结果,具体代码如下。 1 JavaPairRDD<CategorySortKey,String> transCategoryRDD =joinCategoryRDD 2 .mapToPair(new PairFunction<Tuple2<String, 3 Tuple2<Tuple2<Integer, Optional<Integer>>,Optional<Integer>>>, 第3章 热门品类Top10分析 87 4 CategorySortKey,String>() { 5 @Override 6 public Tuple2<CategorySortKey,String> call(Tuple2<String, 7 Tuple2<Tuple2<Integer, Optional<Integer>>, 8 Optional<Integer>>> tuple2) throws Exception { 9 String category_id =tuple2._1; 10 int viewcount =tuple2._2._1._1; 11 int cartcount =0; 12 int purchasecount =0; 13 //判断品类被加入购物车次数是否为空 14 if (tuple2._2._1._2.isPresent()){ 15 cartcount =tuple2._2._1._2.get().intValue(); 16 } 17 //判断品类被购买次数是否为空 18 if (tuple2._2._2.isPresent()){ 19 purchasecount =tuple2._2._2.get().intValue(); 20 } 21 /*将viewcount、cartcount 和purchasecount 映射到 22 类CategorySortKey 的构造方法中*/ 23 CategorySortKey sortKey = 24 new CategorySortKey(viewcount, cartcount, purchasecount); 25 return new Tuple2<>(sortKey,category_id); 26 } 27 }); 上述代码中的isPresent()方法用于判断Optional类型的数据是否为空,若值为空则通 过get()方法获取值,并通过intValue()方法指定获取的值为Int类型。 在文件3-2的main()方法中,通过sortByKey()算子对transCategoryRDD进行排序操 作,使transCategoryRDD中品类被查看次数、加入购物车次数和购买次数根据自定义排序 类CategorySortKey指定的排序规则进行排序,将排序结果加载到sortedCategoryRDD,具 体代码如下。 JavaPairRDD<CategorySortKey,String> sortedCategoryRDD = transCategoryRDD.sortByKey(false); 上述代码中,sortByKey()算子的参数为false,表示使用自定义排序类的比较方式进行 排序。在 文件3-2的main()方法中,使用take()算子获取sortedCategoryRDD前10个元素, 即热门品类Top10分析结果,将分析结果加载到top10CategoryList,具体代码如下。 List<Tuple2<CategorySortKey, String>> top10CategoryList = sortedCategoryRDD.take(10); 上述代码中,take()算子的参数为10,表示获取sortedCategoryRDD前10个元素。 3.3.8 数据持久化 本项目使用HBase数据库作为数据持久化工具,HBase分布式数据库通过HDFS和 Spark项88 目实战 ZooKeeper实现数据的高可用和冗余,从而确保数据库和数据的安全性。接下来,分步骤讲 解如何将热门品类Top10分析结果持久化到HBase数据库中。 1.封装HBase工具类 为了避免后续环节重复编写数据库连接和数据库操作的相关代码,这里将HBase数据 库连接工具类和HBase数据库操作工具类进行封装,具体实现步骤如下。 (1)在项目SparkProject的java目录新建Package包cn.itcast.hbase,用于存放实现数 据持久化的Java文件。在包cn.itcast.hbase下创建文件HbaseConnect.java,用于实现封装 HBase数据库连接工具类,具体代码如文件3-4所示。 文件3-4 HbaseConnect.java 1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.hbase.HBaseConfiguration; 3 import org.apache.hadoop.hbase.MasterNotRunningException; 4 import org.apache.hadoop.hbase.ZooKeeperConnectionException; 5 import org.apache.hadoop.hbase.client.Connection; 6 import org.apache.hadoop.hbase.client.ConnectionFactory; 7 import org.apache.hadoop.hbase.client.HBaseAdmin; 8 import java.io.IOException; 9 public class HbaseConnect { 10 public static Configuration conf; 11 public static Connection conn; 12 public static HBaseAdmin hbaseAdmin; 13 static { 14 //创建HBase 配置信息 15 conf =HBaseConfiguration.create(); 16 //配置ZooKeeper 集群地址 17 conf.set("hbase.zookeeper.quorum", "spark01,spark02,spark03"); 18 //配置ZooKeeper 端口号 19 conf.set("hbase.zookeeper.property.clientPort", "2181"); 20 try { 21 //通过HBase 配置获取HBase 数据库连接对象 22 conn =ConnectionFactory.createConnection(conf); 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 public static HBaseAdmin getHBaseAdmin() throws IOException{ 28 try { 29 //获取HBase 数据库操作对象 30 hbaseAdmin =(HBaseAdmin)(conn.getAdmin()); 31 } catch (MasterNotRunningException e) { 32 e.printStackTrace(); 33 } catch (ZooKeeperConnectionException e) { 34 e.printStackTrace(); 35 } 36 return hbaseAdmin; 第3章 热门品类Top10分析 89 37 } 38 public static Connection getConnection(){ 39 return conn; 40 } 41 public static synchronized void closeConnection(){ 42 if(conn!=null){ 43 try { 44 conn.close(); 45 } catch (IOException e) { 46 e.printStackTrace(); 47 } 48 } 49 } 50 } 在文件3-3中,第38~40行代码创建返回值类型为Connection的方法getConnection(), 用于获取HBase数据库连接;第41~49行代码创建方法closeConnection(),用于关闭 HBase数据库连接。 需要注意的是,若运行项目SparkProject的环境中未配置IP 映射,则需要在配置 Zookeeper集群地址时使用IP地址而不是主机名。 (2)在项目SparkProject的包cn.itcast.hbase下创建文件HbaseUtils.java,用于实现 封装HBase数据库操作工具类,具体代码如文件3-5所示。 文件3-5 HbaseUtils.java 1 import org.apache.hadoop.hbase.HColumnDescriptor; 2 import org.apache.hadoop.hbase.HTableDescriptor; 3 import org.apache.hadoop.hbase.TableName; 4 import org.apache.hadoop.hbase.client.HBaseAdmin; 5 import org.apache.hadoop.hbase.client.Put; 6 import org.apache.hadoop.hbase.client.Table; 7 import org.apache.hadoop.hbase.util.Bytes; 8 import java.io.IOException; 9 public class HbaseUtils { 10 public static void createTable(String tableName, 11 String... columFamilys) 12 throws IOException { 13 //获取HBase 数据表操作对象 14 HBaseAdmin admin =HbaseConnect.getHBaseAdmin(); 15 //判断表是否存在 16 if (admin.tableExists(tableName)){ 17 //关闭表 18 admin.disableTable(tableName); 19 //删除表 20 admin.deleteTable(tableName); 21 } 22 //HTableDescriptor 类包含了表的名字以及表的列族信息 Spark项90 目实战 23 HTableDescriptor hd 24 =new HTableDescriptor(TableName.valueOf(tableName)); 25 for (String cf : columFamilys) { 26 hd.addFamily(new HColumnDescriptor(cf)); 27 } 28 //通过createTable()方法创建HBase 数据表 29 admin.createTable(hd); 30 admin.close(); 31 } 32 public static void putsToHBase(String tableName, 33 String rowkey, 34 String cf, 35 String[] column, 36 String[] value) 37 throws Exception { 38 //获取指定HBase 数据表的操作对象 39 Table table =HbaseConnect 40 .getConnection() 41 .getTable(TableName.valueOf(tableName)); 42 //通过Put 对象存储插入数据表的内容 43 Put puts =new Put(rowkey.getBytes()); 44 for (int i =0;i<column.length;i++){ 45 puts.addColumn( 46 Bytes.toBytes(cf), 47 Bytes.toBytes(column[i]), 48 Bytes.toBytes(value[i])); 49 } 50 //向指定数据表中插入数据 51 table.put(puts); 52 table.close(); 53 } 54 } 在文件3-5中,第10~31行代码定义方法createTable(),用于创建HBase数据表。方 法createTable()包含参数tableName和columFamilys,其中参数tableName表示数据表名 称,参数columFamilys表示列族;第32~52行代码定义方法putsToHBase(),用于向指定 HBase数据表中插入数据。方法putsToHBase()包含参数tableName、rowkey、cf、column 和value,其中参数tableName表示数据表名称,参数rowkey表示行键,参数cf表示列族, 参数column表示行,参数value表示值。 2.持久化热门品类Top10分析结果 在文件3-2的类CategoryTop10中添加方法top10ToHbase(),用于将热门品类Top10 分析结果持久化到HBase数据库中,该方法包含参数top10CategoryList,表示热门品类 Top10分析结果数据,具体代码如下。 第3章 热门品类Top10分析 91 1 public static void top10ToHbase(List<Tuple2<CategorySortKey, String>> 2 top10CategoryList) throws Exception 3 { 4 //创建数据表top10 和列族top10_category 5 HbaseUtils.createTable("top10","top10_category"); 6 //创建数组column,用于存储数据表top10 的列名 7 String[] column = 8 {"category_id","viewcount","cartcount","purchasecount"}; 9 String viewcount =""; 10 String cartcount =""; 11 String purchasecount =""; 12 String category_id =""; 13 int count =0; 14 //遍历集合top10CategoryList 15 for (Tuple2<CategorySortKey, String> top10: top10CategoryList) { 16 count++; 17 //获取查看次数 18 viewcount =String.valueOf(top10._1.getViewCount()); 19 //获取加入购物车次数 20 cartcount =String.valueOf(top10._1.getCartCount()); 21 //获取购买次数 22 purchasecount =String.valueOf(top10._1.getPurchaseCount()); 23 //获取品类ID 24 category_id =top10._2; 25 //创建数组value,用于存储数据表top10 的值 26 String[] value = 27 {category_id,viewcount,cartcount,purchasecount}; 28 HbaseUtils.putsToHBase("top10", 29 "rowkey_top"+count, 30 "top10_category", 31 column, 32 value); 33 } 34 } 上述代码中,第28~32行,调用HBase数据库操作工具类的putToHBase()方法,用于 持久化热门品类Top10数据。putToHBase()方法包含5个参数:其中第1个参数为字符 串top10,表示数据表名称;第2个参数为字符串对象count和字符串rowkey_top,表示数 据表的行键;第3个参数为字符串top10_category,表示数据表的列族;第4个参数为数组 column,数组中的每一个元素表示数据表的列名;第5个参数为数组value,数组中的每一个 元素表示数据表的值。 在文件3-2的main()方法中,调用方法top10ToHbase()并传入参数top10CategoryList, 用于在Spark程序中实现top10ToHbase()方法,将热门品类Top10 分析结果持久化到 HBase数据库中的数据表top10,具体代码如下。 Spark项92 目实战 1 //通过try…catch 抛出异常 2 try { 3 top10ToHbase(top10CategoryList); 4 } catch (Exception e) { 5 e.printStackTrace(); 6 } 7 //关闭HBase 数据库连接 8 HbaseConnect.closeConnection(); 9 //关闭JavaSparkContext 连接 10 sc.close(); 3.4 运行程序 热门品类Top10分析程序编写完成后,需要在IntelliJIDEA 中将程序封装成jar包,并 上传到集群环境中,通过spark-submit将程序提交到YARN 中运行,具体步骤如下。 1.封装jar包 在IntelliJIDEA 主界面单击右侧Maven选项卡打开Maven窗口,如图3-12和图3-13 所示。 图3-12 Maven选项卡 在Maven窗口单击,展开Lifecycle目录,如图3-14所示。 双击Lifecycle目录中的package选项,IntelliJIDEA 会自动将程序封装成jar包,封装 完成后,若出现BUILDSUCCESS内容,则证明成功封装热门品类Top10分析程序为jar