学习目标 . 掌握网站转化率统计实现思路。 . 了解如何生成用户浏览网页数据。 . 掌握如何创建Spark连接并读取数据集。 . 掌握利用SparkSQL统计每个页面访问次数。 . 掌握利用SparkSQL获取每个用户浏览网页的顺序。 . 掌握利用SparkSQL合并同一用户浏览的网页。 . 掌握利用SparkSQL统计每个单跳的次数。 . 掌握利用SparkSQL计算页面单跳转化率。 . 掌握将数据持久化到HBase数据库。 . 熟悉通过SparkOnYARN 运行程序。 网站转化率(conversionrate)是指用户进行相应目标行动的访问次数与总访问次数的 比率。这里所指的相应目标行动可以是用户登录、注册、浏览、购买等一系列用户行为,因此 网站转化率是一个广义的概念。页面单跳转化率是网站转化率的一种统计形式,通过统计 页面单跳转化率,来优化页面布局及营销策略,使访问网站的用户可以更深层次地浏览网 站。本章对用户浏览网页数据进行分析,从而统计出页面单跳转化率。 5.1 数据集分析 本需求通过编写Java程序模拟生成用户浏览网页数据作为数据集,数据集中的每一行 数据代表一个用户的浏览行为,所有浏览行为都与页面和用户有关,该数据集中包含了 10000条用户浏览页面的数据,虽然数据比较多,但是数据内容格式基本类似,这里选取其 中一条数据进行分析,具体如下。 { "actionTime":"2020-07-22 06:34:02", "sessionid":"98ac879b5a0a4a4eb117dffd84da1ff4", "pageid":3, "userid":8 } 110 Spark项目实战 上述数据包含4个字段,每个字段都代表特定的含义,具体介绍如下。 .actionTime:用户访问页面的时间。 .sesionid:用于标识用户行为的唯一值。 .pageid:用户浏览网页的ID 。 .userid:用户ID 。 5.实现思路分析 2 当用户浏览网页时,通过当前浏览页面(A)跳转到另一个页面(B),此用户行为被称为 一次A→B的单跳。如计算A→B的页面单跳转化率,则计算公式如下: A→B页面单跳转化率=A→B的单跳总数/A总访问次数 通过上式可以看出,计算页面单跳转化率需要两部分数据,分别是A→B的单跳总数和A 总访问次数,其中A总访问次数可以通过聚合操作获取,A→B的单跳总数实现思路如下。 (1)根据用户ID和访问时间对数据集进行排序操作,获取每个用户浏览网页的顺序。 (2)根据用户ID对排序后的数据进行分组操作,将同一用户浏览的网页进行合并。 (3)对分组后的数据进行转换操作,将同一用户浏览的网页按照浏览顺序转换为单跳 图5- 1 页面单跳转化率统计实现过程 第5章 网站转化率统计1 11 形式。 (4)对转换后的数据进行聚合操作统计每个单跳的总数,其中包括A→B的单跳总数。 此时,我们可以将A 总访问次数和A→B的单跳总数这两部分数据带入计算公式中, 得出A→B的页面单跳转化率,页面单跳转化率统计实现过程如图5-1所示。 5.3 实现网站转化率统计 5.3.1 生成用户浏览网页数据 实现网站转化率统计程序之前,需要模拟生成用户浏览网页数据,这里主要通过编写 Java应用程序的方式,实现用户浏览网页数据的生成,具体实现步骤如下。 (1)在项目SparkProject的java目录下新建Package包cn.itcast.conversion,用于存放 实现网站转化率统计的Java文件。在包cn.itcast.conversion中创建文件GenerateData. java,用于模拟生成用户浏览网页数据,具体代码如文件5-1所示。 文件5-1 GenerateData.java 1 import com.alibaba.fastjson.JSONObject; 2 import java.io.FileWriter; 3 import java.io.IOException; 4 import java.text.SimpleDateFormat; 5 import java.util.Date; 6 import java.util.Random; 7 import java.util.UUID; 8 public class GenerateData { 9 public static void main(String[] arg) throws IOException { 10 //创建Random 对象,用于生成随机数 11 Random random =new Random(); 12 //创建JSONObject 对象,用于存储用户浏览网页数据 13 JSONObject jsonObject =new JSONObject(); 14 //格式化日期的格式为yyyy-MM-dd 15 SimpleDateFormat DATE_FORMAT =new SimpleDateFormat("yyyy-MM-dd"); 16 //获取当前日期,进行格式化处理获取基础日期date 17 String date =DATE_FORMAT.format(new Date()); 18 //指定用户浏览网页数据输出路径及文件名称 19 String outputFile ="D:\sparkdata\user_conversion.json"; 20 //为基础日期date 随机添加小时 21 String baseActionTime = 22 date +" " +fulfuill(String.valueOf(random.nextInt(24))); 23 //创建FileWriter 输出流对象 24 FileWriter fw =new FileWriter(outputFile); 25 //通过for 循环生成10000 条用户浏览网页数据 26 for (int i =0;i<10000;i++){ 27 //通过UUID 类的方法randomUUID()随机生成sessionid 28 String sessionid = 29 UUID.randomUUID().toString().replace("-", ""); Spark项1 12 目实战 30 //通过Random 对象的方法nextInt()随机生成pageid 31 long pageid =random.nextInt(10)+1; 32 //通过Random 对象的方法nextInt()随机生成userid 33 long userid =random.nextInt(100)+1; 34 //为添加小时的基础日期随机添加分和秒,随机生成actionTime 35 String actionTime =baseActionTime +":" 36 +fulfuill(String.valueOf(random.nextInt(60))) 37 +":" +fulfuill(String.valueOf(random.nextInt(60))); 38 //将sessionid、pageid、userid 和actionTime 添加到JSONObject 对象 39 jsonObject.put("sessionid",sessionid); 40 jsonObject.put("pageid",pageid); 41 jsonObject.put("userid",userid); 42 jsonObject.put("actionTime",actionTime); 43 //将JSONObject 对象转换成String 字符串类型并追加放入输出流对象fw 44 fw.append(jsonObject.toString()+"\n"); 45 } 46 //将输出流对象中的字符串数据写入指定文件 47 fw.flush(); 48 //关闭输出流 49 fw.close(); 50 } 51 //fulfuill()方法用于填充字符串,若字符串长度为1,则在字符串首部添加0 52 public static String fulfuill(String str) { 53 if(str.length() ==2) { 54 return str; 55 } else { 56 return "0" +str; 57 } 58 } 59 } 在文件5-1中,首先模拟生成用户浏览网页数据,用户浏览网页数据包含sessionid(用 户Session)、pageid(页面ID)、userid(用户ID)和actionTime(时间)。然后,将数据存储到 JSONObject对象中。最后,通过FileWriter对象将JSONObject对象转换成String字符串 类型,并追加写入目录D:\\sparkdata下的文件user_conversion.json中。 (2)右击文件GenerateData.java,在弹出的快捷菜单中选择Run.GenerateData.main() 运行程序,生成用户浏览网页数据。 (3)程序运行完成后在D:\\sparkdata目录中会生成JSON 文件user_conversion. json,该文件包含用户浏览网页数据。 5.3.2 修改pom.xml文件 由于实现网站转化率统计是通过Spark SQL 程序实现的,所以需要在项目 SparkProject中添加SparkSQL依赖。在配置文件pom.xml的<dependency>标签中添 加如下内容。 第5章 网站转化率统计1 13 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> 在文件pom.xml中添加依赖后,通过按Ctrl+S 组合键保存文件pom.xml,此时 IntelliJIDEA 会自动下载文件pom.xml中添加的SparkSQL依赖。 5.3.3 创建Spark连接并读取数据集 在项目SparkProject的包cn.itcast.conversion中创建文件PageConversion.java,用于 实现网站转化率统计,具体代码如文件5-2所示。 文件5-2 PageConversion.java 1 public class PageConversion { 2 public static void main(String[] arg){ 3 //实现Spark SQL 程序 4 } 5 } 在文件5-2中,第2~4行代码创建main()方法,该方法是Java程序执行的入口,在 main()方法中实现SparkSQL程序。 在文件5-2的main()方法中创建SparkSession对象,用于实现SparkSQL程序,具体 代码如下。 1 SparkSession spark =SparkSession 2 .builder() 3 //设置Application 名称为page_conversion 4 .appName("page_conversion") 5 .getOrCreate(); 在文件5-2的main()方法中,调用SparkSession对象的read().json()方法读取外部 JSON 文件,将JSON 文件中的数据加载到userConversionDS,具体代码如下。 Dataset<Row> userConversionDS =spark.read().json(arg[0]); 上述代码中,通过变量arg[0]指定JSON 文件路径,目的是执行提交SparkSQL程序 到YARN 集群运行的命令中,通过参数指定JSON 文件路径。 在文件5-2的main()方法中,调用DataSet的createOrReplaceTempView()方法,将 userConversionDS创建为全局临时视图conversion_table,具体代码如下。 userConversionDS.createOrReplaceTempView("conversion_table"); 上述代码中,创建全局临时视图的目的是之后可以通过SQL 语句操作 Spark项1 14 目实战 userConversionDS中的数据。 5.3.4 统计每个页面访问次数 在文件5-2的main()方法中,调用SparkSession的sql()方法统计每个页面访问次数, 将统计结果加载到pageIdPvDS,具体代码如下。 1 Dataset<Row> pageIdPvDS =spark 2 .sql("select pageid,count(*) as pageid_count " + 3 "from conversion_table " + 4 "group by pageid"); 上述代码中的SQL语句,使用groupby子句和count()函数相结合的方式,对全局临 时视图conversion_table的字段pageid进行分组聚合操作,从而统计每个页面访问次数。 5.3.5 获取每个用户浏览网页的顺序 在文件5-2的main()方法中,调用SparkSession的sql()方法对每个用户浏览网页的 顺序进行排序,将排序结果加载到useridGroupSortDS,具体代码如下。 1 Dataset<Row> useridGroupSortDS =spark 2 .sql("select userid,actionTime,pageid " + 3 "from conversion_table " + 4 "order by userid,actionTime"); 上述代码中的SQL语句,使用orderby子句对全局临时视图conversion_table的字段 userid和actionTime进行升序排序,从而根据用户访问时间对每个用户浏览网页的顺序进 行排序。 在文件5-2的main()方法中,调用DataSet的createOrReplaceTempView()方法,将 useridGroupSortDS创建为全局临时视图conversion_group_sort_table,具体代码如下。 useridGroupSortDS.createOrReplaceTempView("conversion_group_sort_table"); 5.3.6 合并同一用户浏览的网页 在文件5-2的main()方法中,调用SparkSession的sql()方法对同一用户浏览的网页 进行合并,将合并结果加载到pageConversionRDD,具体代码如下。 1 JavaRDD<Row> pageConversionRDD =spark.sql("select userid," + 2 "concat_ws(',',collect_list(pageid)) as column2s " + 3 "from conversion_group_sort_table " + 4 "group by userid").toJavaRDD(); 上述代码中的SQL语句,首先使用groupby子句根据字段userid进行分组,然后使用 函数collect_list()将同一组内的pageid合并到一个集合中,最后使用函数concat_ws()将 集合中的每个pageid通过分隔符“,”拼接成字符串存储在字段column2s中。 第5章 网站转化率统计1 15 为了后续使用flatMap()算子对合并后的数据进行扁平化处理,这里通过SparkSession 的toJavaRDD()方法将DataSet转换为JavaRDD。 5.3.7 统计每个单跳的次数 在文件5-2的main()方法中,使用flatMap()算子对pageConversionRDD 进行扁平化 处理,根据用户浏览网页的顺序将相邻网页拼接为单跳,将处理结果加载到rowRDD,具体 代码如下。 1 JavaRDD<Row> rowRDD =pageConversionRDD 2 .flatMap(new FlatMapFunction<Row, Row>() { 3 @Override 4 public Iterator<Row> call(Row row) throws Exception { 5 List<Row> list =new ArrayList<>(); 6 //将用户浏览网页数据拆分为数组 7 String[] page =row.get(1).toString().split(","); 8 String pageConversionStr =""; 9 //遍历数组 10 for (int i =0;i<page.length-1;i++){ 11 //判断相邻网页是否相同 12 if (!page[i].equals(page[i+1])){ 13 //将相邻网页通过字符“_”进行拼接 14 pageConversionStr =page[i]+"_"+page[i+1]; 15 //将字符串转换为Row 对象添加到集合list 16 list.add(RowFactory.create(pageConversionStr)); 17 } 18 } 19 return list.iterator(); 20 } 21 }); 上述代码中,第7行通过row.get(1)获取Row 对象中的用户浏览网页数据,这是因为 pageConversionRDD中每一行数据包含userid和column2s,其中column2s表示用户浏览 网页数据,所以row.get(1)表示获取字段column2s,也就是获取用户浏览网页数据。 在文件5-2 的main()方法中,调用SparkSession 的createDataFrame()方法和 registerTempTable()方法,将存储单跳数据的rowRDD 注册为临时表page_conversion_ table,具体代码如下。 1 //指定临时表的字段信息 2 StructType schema =DataTypes 3 .createStructType( 4 //创建临时表字段 5 new StructField[]{ 6 DataTypes.createStructField( 7 //指定字段名为page_conversion 8 "page_conversion", 9 //指定字段类型为String Spark项1 16 目实战 10 DataTypes.StringType, 11 //指定字段值可以为空 12 true)}); 13 spark.createDataFrame(rowRDD, schema) 14 .registerTempTable("page_conversion_table"); 上述代码中,第13~14行首先通过SparkSession的createDataFrame()方法,根据单跳 数据和临时表的字段信息创建DataFrame,然后通过SparkSession的registerTempTable() 方法将DataFrame注册为临时表page_conversion_table。 在文件5-2的main()方法中,使用SparkSession的sql()方法统计每个单跳的次数,根 据统计结果创建全局临时视图page_conversion_count_table,具体代码如下。 1 spark.sql( 2 "select page_conversion," + 3 "count(*) as page_conversion_count " + 4 "from page_conversion_table " + 5 "group by page_conversion") 6 .createOrReplaceTempView("page_conversion_count_table"); 上述代码中的SQL语句,使用groupby子句和count()函数相结合的方式,对临时表 page_conversion的字段page_conversion进行分组聚合操作,从而统计每个单跳的次数。 5.3.8 计算页面单跳转化率 通过页面单跳转化率的计算公式得知,若要计算单跳A→B的页面单跳转化率,首先需 要获取页面A 的访问次数,然后需要获取单跳A→B的次数,最终将这两部分数据代入页 面单跳转化率的计算公式中计算单跳A→B的页面单跳转化率。 在计算每个单跳的页面单跳转化率之前,需要将每个页面与每个单跳一一对应,也就是 说如果计算单跳A→B的页面单跳转化率,那么代入页面单跳转化率计算公式中的一定是 页面A 的访问次数,而不能是页面B或C的访问次数。 在文件5-2的main()方法中,使用SparkSession的sql()方法拆分单跳为起始页面和 结束页面,将拆分后的数据加载到pageConversionCountDS,具体代码如下。 1 Dataset<Row> pageConversionCountDS =spark 2 .sql("select page_conversion_count," + 3 "split(page_conversion,'_')[0] as start_page," + 4 "split(page_conversion,'_')[1] as last_page " + 5 "from page_conversion_count_table"); 上述代码中的SQL语句,使用split()函数将单跳字段(page_conversion)中的数据按照 分隔符“_”拆分为起始页面字段(start_page)和结束页面字段(last_page)。 在文件5-2的main()方法中,使用join()算子对pageIdPvDS(存储每个页面访问的次 数)和pageConversionCountDS(存储每个单跳拆分后的数据)进行连接,根据连接结果创建 全局临时视图page_conversion_join,具体代码如下。 第5章 网站转化率统计1 17 1 pageConversionCountDS 2 .join( 3 pageIdPvDS, 4 new Column("start_page").equalTo(new Column("pageid")), 5 "left") 6 .createOrReplaceTempView("page_conversion_join"); 上述代码中,通过pageConversionCountDS的起始字段(start_page)和pageIdPvDS的 页面ID(pageid)进行左外连接(left)。 在文件5-2的main()方法中,使用SparkSession的sql()计算页面单跳转化率,将计算 结果加载到resultDS,具体代码如下。 1 Dataset<Row> resultDS =spark 2 .sql("select " + 3 "concat(pageid,'_',last_page) as conversion," + 4 "round(" + 5 "CAST(page_conversion_count AS DOUBLE)/CAST(pageid_count AS DOUBLE)" + 6 ",3) as rage " + 7 "from page_conversion_join"); 上述代码中的SQL语句,首先使用函数concat()将每一行数据中的页面ID(pageid)和 结束页面(last_page)通过字符“_”合并为单跳,存储在字段conversion中;接着使用函数 CAST()将字段page_conversion_count(单跳次数)和pageid_count(页面访问次数)的数据 类型转为DOUBLE类型;然后通过运算符“/”计算页面单跳转化率;最后使用函数round() 保留计算结果的3位小数,并存储在字段rage中。 5.3.9 数据持久化 为了保证各网站转化率统计结果的持久性,便于查看以及应用到数据可视化中,这里需 要进行数据持久化操作,将网站转化率统计结果存储到HBase数据库中,具体实现步骤 如下。在 文件5-2的PageConversion类中添加方法conversionToHBase(),用于将页面单跳 转化率统计结果持久化到HBase数据库中,该方法包含参数dataset,表示需要向方法中传 递页面单跳转化率统计结果数据,具体代码如下。 1 public static void conversionToHBase(Dataset<Row> dataset) 2 throws IOException { 3 //创建数据表conversion 和列族page_conversion 4 HbaseUtils.createTable("conversion","page_conversion"); 5 //创建数组column,用于指定数据表的列名convert_page 和convert_rage 6 String[] column ={"convert_page","convert_rage"}; 7 //遍历页面单跳转化率统计结果数据dataset 8 dataset.foreach(new ForeachFunction<Row>() { 9 @Override 10 public void call(Row row) throws Exception { 11 //获取单跳数据 Spark项1 18 目实战 12 String conversion =row.get(0).toString(); 13 //获取页面单跳转化率数据 14 String rage =row.get(1).toString(); 15 //创建数组value,用于指定数据表的值conversion 和rage 16 String[] value ={conversion,rage}; 17 HbaseUtils.putsToHBase("conversion", 18 conversion+rage, 19 "page_conversion", 20 column, 21 value); 22 } 23 }); 24 } 上述代码中,第17~21行调用HBase数据库操作工具类的putsToHBase()方法,用于 持久化页面单跳转化率统计结果数据。putsToHBase()方法包含5个参数:其中第1个参 数为字符串conversion,表示数据表名称;第2个参数为字符串对象conversion+rage,表示 数据表的行键;第3个参数为字符串page_conversion,表示数据表的列族;第4个参数为数 组column,数组中的每一个元素表示数据表的列名;第5个参数为数组value,数组中的每 一个元素表示数据表的值。 在文件5-2的main()方法中,调用conversionToHBase()方法并传入参数resultDS,用 于在SparkSQL程序中实现conversionToHBase()方法,将页面单跳转化率统计结果数据 持久化到HBase数据库中的数据表conversion,具体代码如下。 1 //通过try…catch 抛出异常 2 try { 3 conversionToHBase(resultDS); 4 } catch (IOException e) { 5 e.printStackTrace(); 6 } 7 //关闭HBase 连接 8 HbaseConnect.closeConnection(); 9 //关闭SparkSession 连接 10 spark.close(); 5.4 运行程序 页面单跳转化率统计程序编写完成后,需要在IntelliJIDEA 中将程序封装成jar包,并 上传到集群环境中,通过spark-submit将程序提交到YARN 中运行,具体步骤如下。 1.封装jar包 由于在封装各区域热门商品Top3分析程序jar包时,将程序主类指向了cn.itcast. top3.AreaProductTop3,因此这里需要将程序主类修改为cn.itcast.conversion. PageConversion。对于封装jar包的操作可参照3.4节,这里不再赘述。将封装完成的jar