学习目标
. 掌握网站转化率统计实现思路。
. 了解如何生成用户浏览网页数据。
. 掌握如何创建Spark连接并读取数据集。
. 掌握利用SparkSQL统计每个页面访问次数。
. 掌握利用SparkSQL获取每个用户浏览网页的顺序。
. 掌握利用SparkSQL合并同一用户浏览的网页。
. 掌握利用SparkSQL统计每个单跳的次数。
. 掌握利用SparkSQL计算页面单跳转化率。
. 掌握将数据持久化到HBase数据库。
. 熟悉通过SparkOnYARN 运行程序。
网站转化率(conversionrate)是指用户进行相应目标行动的访问次数与总访问次数的
比率。这里所指的相应目标行动可以是用户登录、注册、浏览、购买等一系列用户行为,因此
网站转化率是一个广义的概念。页面单跳转化率是网站转化率的一种统计形式,通过统计
页面单跳转化率,来优化页面布局及营销策略,使访问网站的用户可以更深层次地浏览网
站。本章对用户浏览网页数据进行分析,从而统计出页面单跳转化率。
5.1 数据集分析
本需求通过编写Java程序模拟生成用户浏览网页数据作为数据集,数据集中的每一行
数据代表一个用户的浏览行为,所有浏览行为都与页面和用户有关,该数据集中包含了
10000条用户浏览页面的数据,虽然数据比较多,但是数据内容格式基本类似,这里选取其
中一条数据进行分析,具体如下。 
{ 
"actionTime":"2020-07-22 06:34:02", 
"sessionid":"98ac879b5a0a4a4eb117dffd84da1ff4", 
"pageid":3, 
"userid":8 
}

110 
Spark项目实战

上述数据包含4个字段,每个字段都代表特定的含义,具体介绍如下。

.actionTime:用户访问页面的时间。
.sesionid:用于标识用户行为的唯一值。
.pageid:用户浏览网页的ID 。
.userid:用户ID 。
5.实现思路分析
2 

当用户浏览网页时,通过当前浏览页面(A)跳转到另一个页面(B),此用户行为被称为
一次A→B的单跳。如计算A→B的页面单跳转化率,则计算公式如下: 
A→B页面单跳转化率=A→B的单跳总数/A总访问次数
通过上式可以看出,计算页面单跳转化率需要两部分数据,分别是A→B的单跳总数和A 
总访问次数,其中A总访问次数可以通过聚合操作获取,A→B的单跳总数实现思路如下。

(1)根据用户ID和访问时间对数据集进行排序操作,获取每个用户浏览网页的顺序。
(2)根据用户ID对排序后的数据进行分组操作,将同一用户浏览的网页进行合并。
(3)对分组后的数据进行转换操作,将同一用户浏览的网页按照浏览顺序转换为单跳
图5-
1 
页面单跳转化率统计实现过程


第5章 网站转化率统计1 11 
形式。
(4)对转换后的数据进行聚合操作统计每个单跳的总数,其中包括A→B的单跳总数。
此时,我们可以将A 总访问次数和A→B的单跳总数这两部分数据带入计算公式中, 
得出A→B的页面单跳转化率,页面单跳转化率统计实现过程如图5-1所示。
5.3 实现网站转化率统计
5.3.1 生成用户浏览网页数据 
实现网站转化率统计程序之前,需要模拟生成用户浏览网页数据,这里主要通过编写
Java应用程序的方式,实现用户浏览网页数据的生成,具体实现步骤如下。
(1)在项目SparkProject的java目录下新建Package包cn.itcast.conversion,用于存放
实现网站转化率统计的Java文件。在包cn.itcast.conversion中创建文件GenerateData. 
java,用于模拟生成用户浏览网页数据,具体代码如文件5-1所示。
文件5-1 GenerateData.java 
1 import com.alibaba.fastjson.JSONObject; 
2 import java.io.FileWriter; 
3 import java.io.IOException; 
4 import java.text.SimpleDateFormat; 
5 import java.util.Date; 
6 import java.util.Random; 
7 import java.util.UUID; 
8 public class GenerateData { 
9 public static void main(String[] arg) throws IOException { 
10 //创建Random 对象,用于生成随机数
11 Random random =new Random(); 
12 //创建JSONObject 对象,用于存储用户浏览网页数据
13 JSONObject jsonObject =new JSONObject(); 
14 //格式化日期的格式为yyyy-MM-dd 
15 SimpleDateFormat DATE_FORMAT =new SimpleDateFormat("yyyy-MM-dd"); 
16 //获取当前日期,进行格式化处理获取基础日期date 
17 String date =DATE_FORMAT.format(new Date()); 
18 //指定用户浏览网页数据输出路径及文件名称
19 String outputFile ="D:\sparkdata\user_conversion.json"; 
20 //为基础日期date 随机添加小时
21 String baseActionTime = 
22 date +" " +fulfuill(String.valueOf(random.nextInt(24))); 
23 //创建FileWriter 输出流对象
24 FileWriter fw =new FileWriter(outputFile); 
25 //通过for 循环生成10000 条用户浏览网页数据
26 for (int i =0;i<10000;i++){ 
27 //通过UUID 类的方法randomUUID()随机生成sessionid 
28 String sessionid = 
29 UUID.randomUUID().toString().replace("-", "");

Spark项1 12 目实战 
30 //通过Random 对象的方法nextInt()随机生成pageid 
31 long pageid =random.nextInt(10)+1; 
32 //通过Random 对象的方法nextInt()随机生成userid 
33 long userid =random.nextInt(100)+1; 
34 //为添加小时的基础日期随机添加分和秒,随机生成actionTime 
35 String actionTime =baseActionTime +":" 
36 +fulfuill(String.valueOf(random.nextInt(60))) 
37 +":" +fulfuill(String.valueOf(random.nextInt(60))); 
38 //将sessionid、pageid、userid 和actionTime 添加到JSONObject 对象
39 jsonObject.put("sessionid",sessionid); 
40 jsonObject.put("pageid",pageid); 
41 jsonObject.put("userid",userid); 
42 jsonObject.put("actionTime",actionTime); 
43 //将JSONObject 对象转换成String 字符串类型并追加放入输出流对象fw 
44 fw.append(jsonObject.toString()+"\n"); 
45 } 
46 //将输出流对象中的字符串数据写入指定文件
47 fw.flush(); 
48 //关闭输出流
49 fw.close(); 
50 } 
51 //fulfuill()方法用于填充字符串,若字符串长度为1,则在字符串首部添加0 
52 public static String fulfuill(String str) { 
53 if(str.length() ==2) { 
54 return str; 
55 } else { 
56 return "0" +str; 
57 } 
58 } 
59 } 
在文件5-1中,首先模拟生成用户浏览网页数据,用户浏览网页数据包含sessionid(用
户Session)、pageid(页面ID)、userid(用户ID)和actionTime(时间)。然后,将数据存储到
JSONObject对象中。最后,通过FileWriter对象将JSONObject对象转换成String字符串
类型,并追加写入目录D:\\sparkdata下的文件user_conversion.json中。
(2)右击文件GenerateData.java,在弹出的快捷菜单中选择Run.GenerateData.main() 
运行程序,生成用户浏览网页数据。
(3)程序运行完成后在D:\\sparkdata目录中会生成JSON 文件user_conversion. 
json,该文件包含用户浏览网页数据。
5.3.2 修改pom.xml文件
由于实现网站转化率统计是通过Spark SQL 程序实现的,所以需要在项目
SparkProject中添加SparkSQL依赖。在配置文件pom.xml的<dependency>标签中添
加如下内容。

第5章 网站转化率统计1 13 
<dependency> 
<groupId>org.apache.spark</groupId> 
<artifactId>spark-sql_2.11</artifactId> 
<version>2.3.2</version> 
</dependency> 
在文件pom.xml中添加依赖后,通过按Ctrl+S 组合键保存文件pom.xml,此时
IntelliJIDEA 会自动下载文件pom.xml中添加的SparkSQL依赖。
5.3.3 创建Spark连接并读取数据集
在项目SparkProject的包cn.itcast.conversion中创建文件PageConversion.java,用于
实现网站转化率统计,具体代码如文件5-2所示。
文件5-2 PageConversion.java 
1 public class PageConversion { 
2 public static void main(String[] arg){ 
3 //实现Spark SQL 程序
4 } 
5 } 
在文件5-2中,第2~4行代码创建main()方法,该方法是Java程序执行的入口,在
main()方法中实现SparkSQL程序。
在文件5-2的main()方法中创建SparkSession对象,用于实现SparkSQL程序,具体
代码如下。 
1 SparkSession spark =SparkSession 
2 .builder() 
3 //设置Application 名称为page_conversion 
4 .appName("page_conversion") 
5 .getOrCreate(); 
在文件5-2的main()方法中,调用SparkSession对象的read().json()方法读取外部
JSON 文件,将JSON 文件中的数据加载到userConversionDS,具体代码如下。 
Dataset<Row> userConversionDS =spark.read().json(arg[0]); 
上述代码中,通过变量arg[0]指定JSON 文件路径,目的是执行提交SparkSQL程序
到YARN 集群运行的命令中,通过参数指定JSON 文件路径。
在文件5-2的main()方法中,调用DataSet的createOrReplaceTempView()方法,将
userConversionDS创建为全局临时视图conversion_table,具体代码如下。 
userConversionDS.createOrReplaceTempView("conversion_table"); 
上述代码中,创建全局临时视图的目的是之后可以通过SQL 语句操作

Spark项1 14 目实战
userConversionDS中的数据。
5.3.4 统计每个页面访问次数
在文件5-2的main()方法中,调用SparkSession的sql()方法统计每个页面访问次数, 
将统计结果加载到pageIdPvDS,具体代码如下。 
1 Dataset<Row> pageIdPvDS =spark 
2 .sql("select pageid,count(*) as pageid_count " + 
3 "from conversion_table " + 
4 "group by pageid"); 
上述代码中的SQL语句,使用groupby子句和count()函数相结合的方式,对全局临
时视图conversion_table的字段pageid进行分组聚合操作,从而统计每个页面访问次数。
5.3.5 获取每个用户浏览网页的顺序
在文件5-2的main()方法中,调用SparkSession的sql()方法对每个用户浏览网页的
顺序进行排序,将排序结果加载到useridGroupSortDS,具体代码如下。 
1 Dataset<Row> useridGroupSortDS =spark 
2 .sql("select userid,actionTime,pageid " + 
3 "from conversion_table " + 
4 "order by userid,actionTime"); 
上述代码中的SQL语句,使用orderby子句对全局临时视图conversion_table的字段
userid和actionTime进行升序排序,从而根据用户访问时间对每个用户浏览网页的顺序进
行排序。
在文件5-2的main()方法中,调用DataSet的createOrReplaceTempView()方法,将
useridGroupSortDS创建为全局临时视图conversion_group_sort_table,具体代码如下。 
useridGroupSortDS.createOrReplaceTempView("conversion_group_sort_table"); 
5.3.6 合并同一用户浏览的网页
在文件5-2的main()方法中,调用SparkSession的sql()方法对同一用户浏览的网页
进行合并,将合并结果加载到pageConversionRDD,具体代码如下。 
1 JavaRDD<Row> pageConversionRDD =spark.sql("select userid," + 
2 "concat_ws(',',collect_list(pageid)) as column2s " + 
3 "from conversion_group_sort_table " + 
4 "group by userid").toJavaRDD(); 
上述代码中的SQL语句,首先使用groupby子句根据字段userid进行分组,然后使用
函数collect_list()将同一组内的pageid合并到一个集合中,最后使用函数concat_ws()将
集合中的每个pageid通过分隔符“,”拼接成字符串存储在字段column2s中。

第5章 网站转化率统计1 15 
为了后续使用flatMap()算子对合并后的数据进行扁平化处理,这里通过SparkSession 
的toJavaRDD()方法将DataSet转换为JavaRDD。
5.3.7 统计每个单跳的次数
在文件5-2的main()方法中,使用flatMap()算子对pageConversionRDD 进行扁平化
处理,根据用户浏览网页的顺序将相邻网页拼接为单跳,将处理结果加载到rowRDD,具体
代码如下。 
1 JavaRDD<Row> rowRDD =pageConversionRDD 
2 .flatMap(new FlatMapFunction<Row, Row>() { 
3 @Override 
4 public Iterator<Row> call(Row row) throws Exception { 
5 List<Row> list =new ArrayList<>(); 
6 //将用户浏览网页数据拆分为数组
7 String[] page =row.get(1).toString().split(","); 
8 String pageConversionStr =""; 
9 //遍历数组
10 for (int i =0;i<page.length-1;i++){ 
11 //判断相邻网页是否相同
12 if (!page[i].equals(page[i+1])){ 
13 //将相邻网页通过字符“_”进行拼接
14 pageConversionStr =page[i]+"_"+page[i+1]; 
15 //将字符串转换为Row 对象添加到集合list 
16 list.add(RowFactory.create(pageConversionStr)); 
17 } 
18 } 
19 return list.iterator(); 
20 } 
21 }); 
上述代码中,第7行通过row.get(1)获取Row 对象中的用户浏览网页数据,这是因为
pageConversionRDD中每一行数据包含userid和column2s,其中column2s表示用户浏览
网页数据,所以row.get(1)表示获取字段column2s,也就是获取用户浏览网页数据。
在文件5-2 的main()方法中,调用SparkSession 的createDataFrame()方法和
registerTempTable()方法,将存储单跳数据的rowRDD 注册为临时表page_conversion_ 
table,具体代码如下。 
1 //指定临时表的字段信息
2 StructType schema =DataTypes 
3 .createStructType( 
4 //创建临时表字段
5 new StructField[]{ 
6 DataTypes.createStructField( 
7 //指定字段名为page_conversion 
8 "page_conversion", 
9 //指定字段类型为String

Spark项1 16 目实战 
10 DataTypes.StringType, 
11 //指定字段值可以为空
12 true)}); 
13 spark.createDataFrame(rowRDD, schema) 
14 .registerTempTable("page_conversion_table"); 
上述代码中,第13~14行首先通过SparkSession的createDataFrame()方法,根据单跳
数据和临时表的字段信息创建DataFrame,然后通过SparkSession的registerTempTable() 
方法将DataFrame注册为临时表page_conversion_table。
在文件5-2的main()方法中,使用SparkSession的sql()方法统计每个单跳的次数,根
据统计结果创建全局临时视图page_conversion_count_table,具体代码如下。 
1 spark.sql( 
2 "select page_conversion," + 
3 "count(*) as page_conversion_count " + 
4 "from page_conversion_table " + 
5 "group by page_conversion") 
6 .createOrReplaceTempView("page_conversion_count_table"); 
上述代码中的SQL语句,使用groupby子句和count()函数相结合的方式,对临时表
page_conversion的字段page_conversion进行分组聚合操作,从而统计每个单跳的次数。
5.3.8 计算页面单跳转化率
通过页面单跳转化率的计算公式得知,若要计算单跳A→B的页面单跳转化率,首先需
要获取页面A 的访问次数,然后需要获取单跳A→B的次数,最终将这两部分数据代入页
面单跳转化率的计算公式中计算单跳A→B的页面单跳转化率。
在计算每个单跳的页面单跳转化率之前,需要将每个页面与每个单跳一一对应,也就是
说如果计算单跳A→B的页面单跳转化率,那么代入页面单跳转化率计算公式中的一定是
页面A 的访问次数,而不能是页面B或C的访问次数。
在文件5-2的main()方法中,使用SparkSession的sql()方法拆分单跳为起始页面和
结束页面,将拆分后的数据加载到pageConversionCountDS,具体代码如下。 
1 Dataset<Row> pageConversionCountDS =spark 
2 .sql("select page_conversion_count," + 
3 "split(page_conversion,'_')[0] as start_page," + 
4 "split(page_conversion,'_')[1] as last_page " + 
5 "from page_conversion_count_table"); 
上述代码中的SQL语句,使用split()函数将单跳字段(page_conversion)中的数据按照
分隔符“_”拆分为起始页面字段(start_page)和结束页面字段(last_page)。
在文件5-2的main()方法中,使用join()算子对pageIdPvDS(存储每个页面访问的次
数)和pageConversionCountDS(存储每个单跳拆分后的数据)进行连接,根据连接结果创建
全局临时视图page_conversion_join,具体代码如下。

第5章 网站转化率统计1 17 
1 pageConversionCountDS 
2 .join( 
3 pageIdPvDS, 
4 new Column("start_page").equalTo(new Column("pageid")), 
5 "left") 
6 .createOrReplaceTempView("page_conversion_join"); 
上述代码中,通过pageConversionCountDS的起始字段(start_page)和pageIdPvDS的
页面ID(pageid)进行左外连接(left)。
在文件5-2的main()方法中,使用SparkSession的sql()计算页面单跳转化率,将计算
结果加载到resultDS,具体代码如下。 
1 Dataset<Row> resultDS =spark 
2 .sql("select " + 
3 "concat(pageid,'_',last_page) as conversion," + 
4 "round(" + 
5 "CAST(page_conversion_count AS DOUBLE)/CAST(pageid_count AS DOUBLE)" + 
6 ",3) as rage " + 
7 "from page_conversion_join"); 
上述代码中的SQL语句,首先使用函数concat()将每一行数据中的页面ID(pageid)和
结束页面(last_page)通过字符“_”合并为单跳,存储在字段conversion中;接着使用函数
CAST()将字段page_conversion_count(单跳次数)和pageid_count(页面访问次数)的数据
类型转为DOUBLE类型;然后通过运算符“/”计算页面单跳转化率;最后使用函数round() 
保留计算结果的3位小数,并存储在字段rage中。
5.3.9 数据持久化
为了保证各网站转化率统计结果的持久性,便于查看以及应用到数据可视化中,这里需
要进行数据持久化操作,将网站转化率统计结果存储到HBase数据库中,具体实现步骤
如下。在
文件5-2的PageConversion类中添加方法conversionToHBase(),用于将页面单跳
转化率统计结果持久化到HBase数据库中,该方法包含参数dataset,表示需要向方法中传
递页面单跳转化率统计结果数据,具体代码如下。 
1 public static void conversionToHBase(Dataset<Row> dataset) 
2 throws IOException { 
3 //创建数据表conversion 和列族page_conversion 
4 HbaseUtils.createTable("conversion","page_conversion"); 
5 //创建数组column,用于指定数据表的列名convert_page 和convert_rage 
6 String[] column ={"convert_page","convert_rage"}; 
7 //遍历页面单跳转化率统计结果数据dataset 
8 dataset.foreach(new ForeachFunction<Row>() { 
9 @Override 
10 public void call(Row row) throws Exception { 
11 //获取单跳数据

Spark项1 18 目实战 
12 String conversion =row.get(0).toString(); 
13 //获取页面单跳转化率数据
14 String rage =row.get(1).toString(); 
15 //创建数组value,用于指定数据表的值conversion 和rage 
16 String[] value ={conversion,rage}; 
17 HbaseUtils.putsToHBase("conversion", 
18 conversion+rage, 
19 "page_conversion", 
20 column, 
21 value); 
22 } 
23 }); 
24 } 
上述代码中,第17~21行调用HBase数据库操作工具类的putsToHBase()方法,用于
持久化页面单跳转化率统计结果数据。putsToHBase()方法包含5个参数:其中第1个参
数为字符串conversion,表示数据表名称;第2个参数为字符串对象conversion+rage,表示
数据表的行键;第3个参数为字符串page_conversion,表示数据表的列族;第4个参数为数
组column,数组中的每一个元素表示数据表的列名;第5个参数为数组value,数组中的每
一个元素表示数据表的值。
在文件5-2的main()方法中,调用conversionToHBase()方法并传入参数resultDS,用
于在SparkSQL程序中实现conversionToHBase()方法,将页面单跳转化率统计结果数据
持久化到HBase数据库中的数据表conversion,具体代码如下。 
1 //通过try…catch 抛出异常
2 try { 
3 conversionToHBase(resultDS); 
4 } catch (IOException e) { 
5 e.printStackTrace(); 
6 } 
7 //关闭HBase 连接
8 HbaseConnect.closeConnection(); 
9 //关闭SparkSession 连接
10 spark.close(); 
5.4 运行程序
页面单跳转化率统计程序编写完成后,需要在IntelliJIDEA 中将程序封装成jar包,并
上传到集群环境中,通过spark-submit将程序提交到YARN 中运行,具体步骤如下。
1.封装jar包
由于在封装各区域热门商品Top3分析程序jar包时,将程序主类指向了cn.itcast. 
top3.AreaProductTop3,因此这里需要将程序主类修改为cn.itcast.conversion. 
PageConversion。对于封装jar包的操作可参照3.4节,这里不再赘述。将封装完成的jar