学习目标 . 了解数据集分析,能够描述用户浏览网站页面的行为中包含的信息; . 熟悉实现思路分析,能够描述页面单跳转化率统计的实现思路; . 掌握实现网站转化率统计,能够编写用于实现网站转化率统计的Spark程序; . 掌握运行Spark程序,能够将Spark程序提交到YARN 集群运行。 网站转化率是一个广义的概念,表示访问网站的用户中执行期望目标行动用户与所 有用户的比例。目标行动可以包括浏览商品、购买商品或用户注册等。其中,页面单跳转 化率则是网站转化率的一种具体表现形式,用于衡量用户在访问一个页面后执行预期目 标动作的概率。通过对页面单跳转化率的统计,可以优化页面的布局和营销策略,提高用 户在网站上的深度浏览。本章将讲解如何对电商网站的用户行为数据进行分析,从而基 于页面单跳转化率统计网站转化率。 5.1 数据集分析 网站转化率统计使用的数据集为Scala程序模拟生成的用户行为数据。数据集中的 每一行数据都记录了用户浏览网站页面的行为。下面,以数据集中的一条用户行为数据 为例进行详细分析,具体内容如下。 {"action_time":"2023-11-06 10:41:13", "session_id":"dfadda377e2a4733a5c9ebc0d53be6e0","page_id":10,"user_id":24} 从上述内容可以看出,数据集中的每一条用户行为数据都以JSON 对象的形式存在。 该对象包含多个键值对,每个键值对代表着不同的信息。下面,通过解读这些键介绍用户 行为数据中各项信息的含义。 .action_time:表示用户访问网站的时间。 .session_id:表示用户每次访问网站时生成的唯一标识。 . page_id:表示用户访问网站页面的唯一标识。 . user_id:表示用户的唯一标识。 9 4 Spark项目实战(第2版) 5.实现思路分析 2 用户在浏览网站页面时,若从当前正在浏览的页面A跳转到另一页面B进行浏览, 则被视为用户完成了一次由页面A到页面B的单向跳转。例如,统计由页面A到页面B 的页面单跳转化率,计算公式如下。 页面单跳转化率=由页面A到页面B的单向跳转总次数/页面A的总访问次数 下面,通过图5-1详细描述本项目中网站转化率统计的实现思路。 图5- 1 网站转化率统计的实现思路 针对网站转化率统计的实现思路进行如下讲解。 .读取:读取模拟生成的用户行为数据。 .聚合:统计每个页面被访问的次数(page_count)。 .排序:提取用户访问网站的时间(action_time )、用户访问网站页面的唯一标识 (page_id)和用户的唯一标识(user_id), 并根据用户访问网站的时间进行升序排 序,以获取用户访问页面的先后顺序。 .分组:对排序结果进行分组处理,按访问的先后顺序排列每个用户访问的所有 页面。 .转换:将每个用户访问的所有页面中相邻页面的唯一标识转换为page_id_page_ id的形式。例如,如果相邻页面的唯一标识为5和6,则转换结果为5_6,表示由 唯一标识为5的页面到唯一标识为6的页面的单向跳转。 第5章 网站转化率统计 95 . 聚合:统计不同页面之间单向跳转的次数(page_conversion_count)。 . 计算:将页面被访问的次数和该页面到另一个页面单向跳转的次数代入计算页 面单跳转化率的公式中,计算页面单跳转化率。例如,如果唯一标识为5的页面 被用户访问了100次,单向跳转5_6的次数为50,那么页面单跳转化率为50/ 100=0.5,即50%。其意义是所有访问唯一标识为5的页面的用户中,有50%的 用户继续浏览了唯一标识为6的页面。 5.3 实现网站转化率统计 5.3.1 生成用户行为数据 在项目SparkProject的src/main/scala目录中,新建了一个名为cn.itcast.conversion 的包。在cn.itcast.conversion包中创建一个名为GenerateData的Scala单例对象,在该 单例对象中实现模拟生成用户行为数据,具体代码如文件5-1所示。 文件5-1 GenerateData.scala 1 import org.apache.hadoop.conf.Configuration 2 import org.apache.hadoop.fs.{FileSystem, Path} 3 import org.json.JSONObject 4 import scala.util.Random 5 import java.time.LocalDateTime 6 import java.time.format.DateTimeFormatter 7 object GenerateData{ 8 def generateData(): Unit ={ 9 //指定具有HDFS 操作权限的用户root 10 System.setProperty("HADOOP_USER_NAME","root") 11 val random =new Random() 12 val dateTimeFormatter = 13 DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss") 14 //指定模拟生成用户访问网站的起始时间为2024-03-07 00:00:00 15 val startDate =LocalDateTime.of(2024, 3, 7, 0, 0, 0) 16 val conf =new Configuration() 17 //配置HDFS 服务的地址 18 conf.set("fs.defaultFS","hdfs://192.168.88.161:9000") 19 val fs =FileSystem.get(conf) 20 //指定写入用户行为数据的文件路径/page_conversion/user_conversion.json 21 val outputPath =new Path("/page_conversion/user_conversion.json") 22 val outputStream =fs.create(outputPath) 23 for (_ <-1to 10000) { 24 val randomSeconds =random.nextInt(86400) 25 //模拟生成用户访问网站的时间 Spark项目实战(第96 2版) 26 val actionTime =startDate.plusSeconds(randomSeconds.toLong) 27 //模拟用户每次访问网站时生成的唯一标识 28 val sessionId =java.util.UUID.randomUUID().toString 29 //模拟生成用户访问网站页面的唯一标识 30 val pageId =1+random.nextInt(10) 31 //模拟生成用户的唯一标识 32 val userId =1+random.nextInt(100) 33 val visitData =new JSONObject() 34 visitData.put("action_time", dateTimeFormatter.format(actionTime)) 35 visitData.put("session_id", sessionId) 36 visitData.put("page_id", pageId) 37 visitData.put("user_id", userId) 38 //将模拟生成的用户行为数据写入HDFS 的指定文件中 39 outputStream.writeBytes(visitData.toString +"\n") 40 } 41 outputStream.close() 42 fs.close() 43 } 44 def main(args: Array[String]): Unit ={ 45 generateData() 46 } 47 } 上述代码中,第23~40行代码通过for循环模拟生成10000条用户行为数据,其中 第33~37行代码,用于将模拟生成的用户行为数据调整为JSON 对象的格式。 确保Hadoop集群正常启动后,运行文件5-1。当文件5-1运行完成后,查看HDFS 的目录/page_conversion中文件user_conversion.json的内容。在虚拟机Spark01执行如 下命令。 $h dfs dfs -cat /page_conversion/user_conversion.json 上述命令执行完成的效果如图5-2所示。 图5-2展示了文件user_conversion.json的部分数据,这些数据记录了用户浏览网站 页面的行为。因此说明,成功将模拟生成的用户行为数据写入了HDFS。 5.3.2 实现Spark程序 在项目SparkProject的包cn.itcast.conversion中新建一个名为PageConversion的 Scala单例对象,在该单例对象中实现网站转化率统计的Spark程序,具体实现过程如下。 (1)由于本项目需要通过SparkSQL实现网站转化率统计,所以在实现Spark程序 之前,需要在项目SparkProject中添加SparkSQL 依赖。在配置文件pom.xml的 <dependencies>标签中添加如下内容。 第5章 网站转化率统计 97 图5-2 查看文件user_conversion.json的内容 1 <dependency> 2 <groupId>org.apache.spark</groupId> 3 <artifactId>spark-sql_2.12</artifactId> 4 <version>3.3.0</version> 5 </dependency> 上述内容添加完成后,在IntelliJIDEA 的Maven窗口中确认SparkSQL依赖是否 存在于项目SparkProject中。 (2)在单例对象PageConversion中添加main()方法,用于定义Spark程序的实现逻 辑,具体代码如文件5-2所示。 文件5-2 PageConversion.scala 1 package cn.itcast.conversion 2 object PageConversion { 3 def main(args: Array[String]): Unit ={ 4 //实现逻辑 5 } 6 } (3)在Spark程序中创建SparkSession对象spark,用于配置并管理Spark程序的执 行。在单例对象PageConversion的main()方法中添加如下代码。 1 val spark =SparkSession.builder() 2 .appName("PageConversion") Spark项目实战(第98 2版) 3 .getOrCreate() 上述代码指定Spark程序的名称为PageConversion。 (4)在Spark 程序中,通过DataFrameAPI提供的json()方法从HDFS 中读取 JSON 格式的用户行为数据,并将读取的数据注册为临时视图conversion_table。在单例 对象PageConversion的main()方法中添加如下代码。 spark.read.json(args(0)).createOrReplaceTempView("conversion_table") 上述代码中,使用args(0)代替用户行为数据的具体路径,以便将Spark程序提交到 YARN 集群运行时,可以更加灵活地通过spark-submit命令的参数来指定用户行为数据 的具体路径。 (5)在Spark程序中,通过DataFrameAPI提供的sql()方法执行SQL语句,基于临 时视图conversion_table统计每个页面被访问的次数,并将SQL语句的执行结果存储到 DataFrame对象pageVisitStatsDF中。在单例对象PageConversion的main()方法中添 加如下代码。 1 val pageVisitStatsDF =spark.sql( 2 "select page_id," + 3 "count(*) as page_count " + 4 "from conversion_table " + 5 "group by page_id" 6 ) 上述代码中的SQL语句使用了groupby子句,按照字段page_id进行分组。对于每 个分组,使用了count()函数计算该分组中记录的数量,并将计算结果作为新的字段 page_count返回,该字段中记录了每个页面被访问的次数。 (6)在Spark程序中,通过DataFrameAPI提供的sql()方法执行SQL语句,基于临 时视图conversion_table按照用户访问网站的时间进行升序排序,并将SQL语句的执行 结果注册为临时视图conversion_sort_table。在单例对象PageConversion的main()方 法中添加如下代码。 1 spark.sql( 2 "select user_id," + 3 "action_time," + 4 "page_id " + 5 "from conversion_table " + 6 "order by user_id,action_time") 7 .createOrReplaceTempView("conversion_sort_table") 上述代码中的SQL语句使用了orderby子句,按照字段user_id和action_time进行 升序排序,获取用户访问页面的先后顺序。 第5章 网站转化率统计 99 (7)在Spark程序中,通过DataFrameAPI提供的sql()方法执行SQL语句,基于临 时视图conversion_sort_table合并每个用户访问的所有页面,并将SQL语句的执行结果 存储到RDD对象pageConversionRDD中。在单例对象PageConversion的main()方法 中添加如下代码。 1 val pageConversionRDD: RDD[Row] = 2 spark.sql( 3 "select user_id," + 4 "concat_ws(',',collect_list(page_id)) as page_list " + 5 "from conversion_sort_table " + 6 "group by user_id").rdd 上述代码中的SQL语句使用groupby子句,按照字段user_id进行分组。对于每个 分组,首先,使用collect_list()函数将字段page_id的值合并到一个列表。然后,使用 concat_ws()函数将列表中的每个值合并为一个逗号分隔的字符串,并将合并结果作为新 的字段page_list返回,该字段中记录了用户访问的所有页面的唯一标识。 (8)在Spark程序中,通过flatMap算子对RDD对象pageConversionRDD进行转换 操作,并将转换操作的结果存储到RDD 对象rowRDD 中。在单例对象PageConversion 的main()方法中添加如下代码。 1 val rowRDD: RDD[Row] =pageConversionRDD.flatMap { 2 row => 3 //创建集合list,用于存储页面唯一标识转换为单向跳转的形式 4 val list: ListBuffer[Row] =new ListBuffer[Row]() 5 val page: Array[String] =row.getString(1).split(",") 6 for (i <-0 until page.length -1) { 7 if (page(i) !=page(i +1)) { 8 val pageConversionStr: String =page(i) +"_" +page(i +1) 9 list +=RowFactory.create(pageConversionStr) 10 } 11 } 12 list.iterator 13 } 上述代码中,第5行代码用于获取记录用户访问的所有页面的唯一标识的字符串,并 通过逗号将其拆分为数组page。第6~11行代码用于遍历数组page,获取每个页面唯一 标识,如果两个相邻页面的唯一标识不相等,则将它们通过字符“_”合并为字符串,并添加 到集合list中。 (9)在Spark程序中创建StructType对象schema,用于指定数据结构。在单例对象 PageConversion的main()方法中添加如下代码。 1 val schema: StructType =DataTypes.createStructType( Spark项目实战(第1 00 2版) 2 Array( 3 DataTypes.createStructField( 4 "page_conversion", 5 DataTypes.StringType, 6 true 7 ) 8 ) 9 ) 上述代码指定的数据结构包含一个名为page_conversion的字段,该字段的数据类型 为String并且允许值为空。 (10)在Spark程序中,基于StructType对象schema中指定的数据结构,将RDD对 象rowRDD转换为DataFrame对象,并将其注册为临时视图page_conversion_table。在 单例对象PageConversion的main()方法中添加如下代码。 1 spark.createDataFrame(rowRDD, schema) 2 .createOrReplaceTempView("page_conversion_table") (11)在Spark程序中,通过DataFrameAPI提供的sql()方法执行SQL语句,基于 临时视图page_conversion_table统计不同页面之间单向跳转的次数,并将SQL语句的执 行结果注册为临时视图page_conversion_count_table。在单例对象PageConversion的 main()方法中添加如下代码。 1 spark.sql( 2 "select page_conversion," + 3 "count(*) as page_conversion_count " + 4 "from page_conversion_table " + 5 "group by page_conversion" 6 ).createOrReplaceTempView("page_conversion_count_table") 上述代码中的SQL语句使用了groupby子句,按照字段page_conversion进行分 组。对于每个分组,使用count()函数计算该分组中记录的数量,并将计算结果作为新的 字段page_conversion_count返回,该字段记录了不同页面之间单向跳转的次数。 (12)在Spark程序中,通过DataFrameAPI提供的sql()方法执行SQL语句,对临 时视图page_conversion_count_table中字段page_conversion的值进行转换,并将SQL 语句的执行结果存储到DataFrame 对象pageConversionCountDF 中。在单例对象 PageConversion的main()方法中添加如下代码。 1 val pageConversionCountDF =spark.sql( 2 "select page_conversion_count," + 3 "split(page_conversion,'_')[0] as start_page," + 第5章 网站转化率统计1 01 4 "split(page_conversion,'_')[1] as last_page " + 5 "from page_conversion_count_table" 6 ) 上述代码中的SQL语句使用split()函数,根据字符“_”将字段page_conversion的内 容拆分为两部分,并分别将这两部分作为新的字段start_page和last_page返回。其中, 字段start_page记录了不同页面之间单向跳转的起始页面的唯一标识;字段last_page记 录了不同页面之间单向跳转的结束页面的唯一标识。 (13)在Spark程序中,通过join算子对DataFrame对象pageConversionCountDF和 pageVisitStatsDF进行左外连接操作,并将操作结果注册为临时视图page_conversion_ join。在单例对象PageConversion的main()方法中添加如下代码。 1 pageConversionCountDF.join( 2 pageVisitStatsDF, 3 col("start_page").equalTo(col("page_id")), 4 "left" 5 ).createOrReplaceTempView("page_conversion_join") 上述代码中指定的连接条件为DataFrame对象pageConversionCountDF 中字段 start_page的值等于DataFrame对象pageVisitStatsDF 中字段page_id的值。通过对 DataFrame对象pageConversionCountDF和pageVisitStatsDF进行左外连接,可以在临 时视图page_conversion_join中将页面的总访问次数和该页面跳转到其他页面的单跳次 数相关联,便于后续通过这两个值计算页面单跳转化率。 (14)在Spark程序中,通过DataFrameAPI提供的sql()方法执行SQL语句,计算 页面单跳转化率,并将SQL语句的执行结果存储到DataFrame对象resultDF中。在单 例对象PageConversion的main()方法中添加如下代码。 1 val resultDF =spark.sql( 2 "select concat(page_id,'_',last_page) as conversion," + 3 "round(CAST(page_conversion_count AS DOUBLE)/CAST(page_count AS DOUBLE)," + 4 "5) as rage " + 5 "from page_conversion_join" 6 ) 上述代码中的SQL语句,首先使用concat()函数将字段page_id和last_page的值通 过字符“_”拼接为新的字符串,并将其作为新的字段conversion返回,该字段中的记录为 不同页面之间单向跳转的形式。然后,使用cast()函数将字段page_conversion_count和 page_count的值转换为Double类型,并对这两个字段的值进行除法运算,获取页面单跳 转化率。最后,使用round()函数对除法运算的结果四舍五入到小数点后五位,并将其作 为新的字段rage返回。 Spark项目实战(第1 02 2版) 5.3.3 数据持久化 通过记录历史事件、文化传统和社会变迁,有助于人们对历史的正确理解和认识,从 而在思想上得到启迪和教育。上一节内容实现的Spark程序仅仅获取了网站转化率的统 计结果。为了便于后续进行数据可视化,并确保分析结果的长期存储,需要进行数据持久 化操作。本项目使用HBase作为数据持久化工具。接下来,分步骤讲解如何将网站转化 率的统计结果存储到HBase的表中,具体操作步骤如下。 (1)在单例对象PageConversion中定义一个conversionToHBase()方法,该方法用 于向HBase的表conversion中插入网站转化率的统计结果,具体代码如下。 1 def conversionToHBase(dataframe: DataFrame): Unit ={ 2 //在HBase 中创建表conversion 并向表中添加列族page_conversion 3 HBaseUtils.createtable("conversion","page_conversion") 4 //创建数组column,用于指定列标识的名称 5 val column =Array("convert_page", "convert_rage") 6 dataframe.foreach { 7 row => 8 //获取不同页面之间的单向跳转 9 val conversion =row.getString(0) 10 //获取页面单跳转化率 11 val rage =row.getDouble(1).toString 12 //创建数组value,用于指定插入的数据 13 val value =Array(conversion, rage) 14 HBaseUtils.putsToHBase( 15 "conversion", 16 conversion +rage, 17 "page_conversion", 18 column, 19 value 20 ) 21 } 22 } 上述代码中,第14~20行代码用于向HBase中表conversion的列page_conversion: convert_page和page_conversion:convert_rage插入数据,数据的内容依次为不同页面之 间的单向跳转和页面单跳转化率。 (2)在单例对象PageConversion的main()方法中调用conversionToHBase()方法 并将resultDF作为参数传递,实现将网站转化率的统计结果插入HBase的表conversion 中,具体代码如下。 1 try { 2 PageConversion.conversionToHBase(resultDF)