学习目标 .掌握流量分析,能够根据不同日期和时间维度分析网站用户活跃度。 .掌握商品分析,能够分析网站中的畅销品和滞销品。 .掌握设备分析,能够分析用户在不同时间段的设备偏好。 .掌握推荐系统,能够基于协同过滤为用户推荐可能感兴趣的商品。 .掌握地域分析,能够实时统计每个城市的销售情况。 数据分析是一种从数据中获取有价值信息的方法,它可以帮助我们理解数据背后的 规律。随着信息技术的飞速进步,数据的规模和复杂度不断增加,大数据时代已经到来。 在这个时代,数据分析在各个领域都发挥着重要的作用。无论是商业决策,还是科学研 究,数据分析都提供了新的视角和解决方案。本章详细介绍如何使用Spark进行用户行 为数据分析。 5.流量分析 1 流量分析是一种利用网站访问数据来评估和优化网站运营效果的方法,它对于提升 网站的设计、内容和营销策略有着重要的作用。流量分析包含多种指标,如页面浏览量 (PV )、跳出率、访问时长等,它们可以从不同的角度反映用户在网站上的行为和偏好。本 项目重点分析流量分析中的页面浏览量指标。 页面浏览量是指用户在网站上访问页面的总次数,每当用户打开一个网站页面,页面 浏览量就会增加一次,即使用户对同一页面进行了多次访问,也不会影响页面浏览量的计 算。页面浏览量作为网站运营的重要参考数据,能够直观地反映出网站用户的活跃程度 和网站内容的吸引力。 本项目将从不同的日期和时间维度对页面浏览量进行分析,包括月度、季度等,以全 面了解网站用户活跃度的变化趋势。接下来演示如何利用Spark,对2023 年的历史用户 行为数据进行页面浏览量分析,具体操作步骤如下。 1. 创建Python文件 在项目spark_project中创建名为analyze的目录,在该目录中创建名为visit_counts 的Python文件。 Spark项目实训(Python1 14 版) 2.实现Spark程序 在visit_counts.py文件中实现页面浏览量分析的Spark程序,具体实现过程如下。 (1)导入用于创建和配置Spark程序的类SparkSession,以及页面浏览量分析所用 到的函数col、lit、count、from_unixtime和unix_timestamp。在visit_counts.py文件中添 加如下代码。 1 from pyspark.sql import SparkSession 2 from pyspark.sql.functions import col, lit, count, \ 3 from_unixtime, unix_timestamp (2)创建SparkSession对象的同时启用Hive支持,以便Spark程序与Hive进行交 互。在visit_counts.py文件中添加如下代码。 1 spark = SparkSession.builder \ 2 .appName("visit_count") \ 3 .config("hive.metastore.uris", "thrift://spark01:9083") \ 4 .enableHiveSupport() \ 5 .getOrCreate() (3)获取表user_behavior_detail、dim_date和dim_time的数据。在visit_counts.py 文件中添加如下代码。 1 user_behavior_detail = spark.read \ 2 .table("user_behavior_db.user_behavior_detail") 3 dim_date = spark.read.table("user_behavior_db.dim_date") 4 dim_time = spark.read.table("user_behavior_db.dim_time") 上述代码中,第1、2行代码生成了一个DataFrame对象user_behavior_detail,该对 象包含了表user_behavior_detail的数据。第3行代码生成了一个DataFrame对象dim_ date,该对象包含了表dim_date的数据。第4行代码生成了一个DataFrame对象dim_ time,该对象包含了表dim_time的数据。 (4)分析每个月的页面浏览量。在visit_counts.py文件中添加如下代码。 1 month_info = user_behavior_detail.groupBy("monthinfo") \ 2 .agg(count("*").alias("visit_count")) \ 3 .select( 4 col("monthinfo").alias("month_info"), 5 lit("-1").alias("day_info"), 6 lit("-1").alias("quarter_info"), 7 lit("-1").alias("am_pm_info"), 8 lit("-1").alias("week_info"), 9 lit("0").alias("group_type"), 10 "visit_count" 11 ) 第5章 数据分析1 15 上述代码,首先,使用groupBy算子按照字段monthinfo对表user_behavior_detail 中的数据进行分组。然后,使用agg算子对每组数据进行聚合操作,统计每组数据的行数 (即页面访问量),并将结果保存到字段visit_count中。最后,使用select算子选择并创建 所需的字段,生成DataFrame对象month_info。 关于select算子选择并创建字段的介绍如下。 ① 选择字段monthinfo并重命名为month_info,用于获取月份的值。 ② 创建值为-1的常量字段day_info,用于表示分析结果与日期无关。 ③ 创建值为-1的常量字段quarter_info,用于表示分析结果与季度无关。 ④ 创建值为-1的常量字段am_pm_info,用于表示分析结果与上、下午无关。 ⑤ 创建值为-1的常量字段week_info,用于表示分析结果与星期无关。 ⑥ 创建值为0的常量字段group_type,用于表示分析结果与月份有关。 ⑦ 选择字段visit_count,用于获取分析结果。 (5)分析每天的页面浏览量。在visit_counts.py文件中添加如下代码。 1 day_info = user_behavior_detail.groupBy("action_date_key") \ 2 .agg(count("*").alias("visit_count")) \ 3 .select( 4 lit("-1").alias("month_info"), 5 from_unixtime( 6 unix_timestamp("action_date_key", "yyyyMMdd"), 7 "yyyy-MM-dd") 8 .alias("day_info"), 9 lit("-1").alias("quarter_info"), 10 lit("-1").alias("am_pm_info"), 11 lit("-1").alias("week_info"), 12 lit("1").alias("group_type"), 13 "visit_count" 14 ) 上述代码,首先,使用groupBy算子按照字段action_date_key对表user_behavior_ detail中的数据进行分组。然后,使用agg算子对每组数据进行聚合操作,统计每组数据 的行数(即页面访问量),并将结果保存到字段visit_count中。最后,使用select算子选择 并创建所需的字段,生成DataFrame对象day_info。 关于select算子选择并创建字段的介绍如下。 ① 创建值为-1的常量字段month_info,用于表示分析结果与月份无关。 ② 选择字段action_date_key,将字段的值格式化为yyyy-MM-dd格式的日期字符串 之后,保存到字段day_info,用于获取日期的值。 ③ 创建值为-1的常量字段quarter_info,用于表示分析结果与季度无关。 ④ 创建值为-1的常量字段am_pm_info,用于表示分析结果与上、下午无关。 ⑤ 创建值为-1的常量字段week_info,用于表示分析结果与星期无关。 Spark项目实训(Python1 16 版) ⑥ 创建值为1的常量字段group_type,用于表示分析结果与日期有关。 ⑦ 选择字段visit_count,用于获取分析结果。 (6)分析每个季度的页面浏览量。在visit_counts.py文件中添加如下代码。 1 dim_date_join = user_behavior_detail.join( 2 dim_date, 3 user_behavior_detail.action_date_key == dim_date.date_key 4 ).cache() 5 quarter_info = dim_date_join.groupBy("quarter") \ 6 .agg(count("*").alias("visit_count")) \ 7 .select( 8 lit("-1").alias("month_info"), 9 lit("-1").alias("day_info"), 10 col("quarter").alias("quarter_info"), 11 lit("-1").alias("am_pm_info"), 12 lit("-1").alias("week_info"), 13 lit("2").alias("group_type"), 14 "visit_count" 15 ) 上述代码中,第1~4行代码使用join算子对DataFrame对象user_behavior_detail 和dim_date进行内连接,指定关联条件为user_behavior_detail中字段action_date_key 的值等于dim_date中字段date_key的值。将内连接的结果保存到DataFrame对象dim_ date_join并进行缓存,以便后续重复使用。 第5~15行代码,首先,使用groupBy算子按照字段quarter对dim_date_join中的数 据进行分组。然后,使用agg算子对每组数据进行聚合操作,统计每组数据的行数(即页 面访问量),并将结果保存到字段visit_count中。最后,使用select算子选择并创建所需 的字段,生成DataFrame对象quarter_info。 关于select算子选择并创建字段的介绍如下。 ① 创建值为-1的常量字段month_info,用于表示分析结果与月份无关。 ② 创建值为-1的常量字段day_info,用于表示分析结果与日期无关。 ③ 选择字段quarter并重命名为quarter_info,用于获取季度的值。 ④ 创建值为-1的常量字段am_pm_info,用于表示分析结果与上、下午无关。 ⑤ 创建值为-1的常量字段week_info,用于表示分析结果与星期无关。 ⑥ 创建值为2的常量字段group_type,用于表示分析结果与季度有关。 ⑦ 选择字段visit_count,用于获取分析结果。 (7)分析不同星期的页面浏览量。在visit_counts.py文件中添加如下代码。 1 week_info = dim_date_join.groupBy("day_of_week") \ 2 .agg(count("*").alias("visit_count")) \ 3 .select( 第5章 数据分析1 17 4 lit("-1").alias("month_info"), 5 lit("-1").alias("day_info"), 6 lit("-1").alias("quarter_info"), 7 lit("-1").alias("am_pm_info"), 8 col("day_of_week").alias("week_info"), 9 lit("4").alias("group_type"), 10 "visit_count" 11 ) 上述代码,首先,使用groupBy算子按照字段day_of_week对dim_date_join中的数 据进行分组。然后,使用agg算子对每组数据进行聚合操作,统计每组数据的行数(即页 面访问量),并将结果保存到字段visit_count中。最后,使用select算子选择并创建所需 的字段,生成DataFrame对象week_info。 关于select算子选择并创建字段的介绍如下。 ① 创建值为-1的常量字段month_info,用于表示分析结果与月份无关。 ② 创建值为-1的常量字段day_info,用于表示分析结果与日期无关。 ③ 创建值为-1的常量字段quarter_info,用于表示分析结果与季度无关。 ④ 创建值为-1的常量字段am_pm_info,用于表示分析结果与上、下午无关。 ⑤ 选择字段day_of_week并重命名为week_info,用于获取星期的值。 ⑥ 创建值为4的常量字段group_type,用于表示分析结果与星期有关。 ⑦ 选择字段visit_count,用于获取分析结果。 (8)分析上午和下午的页面浏览量。在visit_counts.py文件中添加如下代码。 1 dim_time_join = user_behavior_detail.join( 2 dim_time, 3 user_behavior_detail.action_time_key == dim_time.time_key 4 ) 5 am_pm_info = dim_time_join.groupBy("am_pm") \ 6 .agg(count("*").alias("visit_count")) \ 7 .select( 8 lit("-1").alias("month_info"), 9 lit("-1").alias("day_info"), 10 lit("-1").alias("quarter_info"), 11 col("am_pm").alias("am_pm_info"), 12 lit("-1").alias("week_info"), 13 lit("3").alias("group_type"), 14 "visit_count" 15 ) 上述代码中,第1~4行代码使用join算子对DataFrame对象user_behavior_detail 和dim_time进行内连接,指定关联条件为user_behavior_detail中字段action_time_key 的值等于dim_time中字段time_key的值。将内连接的结果保存到DataFrame对象 Spark项目实训(Python1 18 版) dim_time_join。 第5~15行代码,首先,使用groupBy算子按照字段am_pm 对dim_time_join中的 数据进行分组。然后,使用agg算子对每组数据进行聚合操作,统计每组数据的行数(即 页面访问量),并将结果保存到字段visit_count中。最后,使用select算子选择并创建所 需的字段,生成DataFrame对象am_pm_info。 关于select算子选择并创建字段的介绍如下。 ① 创建值为-1的常量字段month_info,用于表示分析结果与月份无关。 ② 创建值为-1的常量字段day_info,用于表示分析结果与日期无关。 ③ 创建值为-1的常量字段quarter_info,用于表示分析结果与季度无关。 ④ 选择字段am_pm 并重命名为am_pm_info,用于获取上午或下午的值。 ⑤ 创建值为-1的常量字段week_info,用于表示分析结果与星期无关。 ⑥ 创建值为3的常量字段group_type,用于表示分析结果与上午和下午有关。 ⑦ 选择字段visit_count,用于获取分析结果。 (9)使用union算子合并DataFrame对象month_info、day_info、quarter_info、am_ pm_info和week_info。在visit_counts.py文件中添加如下代码。 1 result = month_info.union(day_info).union(quarter_info) \ 2 .union(am_pm_info).union(week_info) 上述代码生成了DataFrame对象result,该对象中包含了不同日期和时间维度的页 面访问量分析结果。 (10)将DataFrame对象result中的数据以覆盖模式写入表ads_visit_counts_2023。 在visit_counts.py文件中添加如下代码。 1 #指定表ads_visit_counts_2023 在HDFS 存储数据的目录 2 table_location = '/user_behavior/ads/ads_visit_counts_2023' 3 result.write.format("hive").mode("overwrite") \ 4 .option('path', table_location) \ 5 .saveAsTable("user_behavior_db.ads_visit_counts_2023") 3.运行Spark程序 为了在YARN 集群上运行Spark程序,需要将visit_counts.py文件上传到虚拟机 Spark02的/export/servers目录中。确保MetaStore服务、HDFS集群和YARN 集群处 于启动状态下,将visit_counts.py文件中实现的Spark程序提交到YARN 集群运行。在 虚拟机Spark02执行如下命令。 spark-submit \ --master yarn \ --deploy-mode cluster \ /export/servers/visit_counts.py 第5章 数据分析1 19 上述命令执行完成后可以通过访问YARN WebUI查看Spark程序的运行状态,若 其状态为FINISHED并且最终状态为SUCCEEDED表示运行成功。 需要说明的是,为了优化资源利用,在运行流量分析的Spark程序时,可以选择仅启 动集群环境中的HDFS集群、YARN 集群和MetaStore服务。 4.查看表ads_visit_counts_2023的数据 由于表ads_visit_counts_2023存储了不同日期和时间维度的流量分析结果,为避免 全表查询产生大量查询结果,将根据月份维度查询流量分析的结果。在Hive命令行界 面执行如下命令。 hive> SELECT * FROM user_behavior_db.ads_visit_counts_2023 > WHERE group_type = 0; 上述命令执行完成的效果如图5-1所示。 图5-1 查看表ads_visit_counts_2023中的数据 图5-1展示了表ads_visit_counts_2023中关于月份维度的流量分析结果,这些数据 反映了每个月用户访问网站的总次数。例如,2023年1月份用户访问网站的总次数为 1657。需要说明的是,实际分析结果与读者采集的历史用户行为数据有关。 5.2 商品分析 商品分析是一种利用商品数据来了解商品销售情况、市场趋势和用户行为的方法。 它对于优化商品、预测市场趋势和制定营销策略至关重要。例如,通过分析各商品的销 量,可以识别畅销品和滞销品,为商品优化提供依据;通过分析历史销售数据,可以预测未 来的销售趋势,为商品的进货、陈列、促销等决策提供依据。 Spark项目实训(Python1 20 版) 本项目分析各商品的销量,将销量最高的10件商品定义为畅销品,销量最低的10件 商品定义为滞销品,以全面了解网站中的畅销品和滞销品。接下来讲解如何利用Spark 对2023年的历史用户行为数据进行商品分析,具体操作步骤如下。 1.创建Python文件 在项目spark_project的analyze目录中创建名为product_sale_counts的Python 文件。 2.实现Spark程序 在product_sale_counts.py文件中实现分析各商品销量的Spark程序,具体实现过程 如下。 (1)导入用于创建和配置Spark程序的类SparkSession。在product_sale_counts.py 文件中添加如下代码。 from pyspark.sql import SparkSession (2)创建SparkSession对象的同时启用Hive支持,以便Spark程序与Hive进行交 互。在product_sale_counts.py文件中添加如下代码。 1 spark = SparkSession.builder \ 2 .appName("sale_count") \ 3 .config("hive.metastore.uris", "thrift://spark01:9083") \ 4 .enableHiveSupport() \ 5 .getOrCreate() (3)从历史用户行为数据中获取被购买过的商品。在product_sale_counts.py文件 中添加如下代码。 1 product_sales = spark.sql(""" 2 SELECT product_id 3 FROM user_behavior_db.user_behavior_detail 4 WHERE behavior_type = 'purchase' 5 """) 上述代码通过执行SQL语句,从表user_behavior_detail中获取字段behavior_type 值为purchase 的数据,并从这些数据中提取字段product_id 的值,将其存放在 DataFrame对象product_sales中。 (4)为了后续使用SQL语句查询DataFrame对象product_sales中的数据,这里将 其注册为临时表product_sales。在product_sale_counts.py文件中添加如下代码。 product_sales.createOrReplaceTempView("product_sales") (5)统计每件商品被购买的次数。在product_sale_counts.py文件中添加如下代码。 第5章 数据分析1 21 1 product_sale_counts = spark.sql(""" 2 SELECT product_id, COUNT(*) as sale_count 3 FROM product_sales 4 GROUP BY product_id 5 """) 上述代码通过执行SQL语句,对临时表product_sales中的数据按照字段product_id 进行分组,并对每组数据进行聚合操作,统计每组数据的行数(即每个商品被购买的次 数),将结果保存到字段sale_count中,最终生成一个DataFrame对象product_sale_ counts。 (6)为了后续使用SQL语句查询DataFrame对象product_sale_counts中的数据,这 里将其注册为临时表product_sale_counts。在product_sale_counts.py文件中添加如下 代码。 product_sale_counts.createOrReplaceTempView("product_sale_counts") (7)获取销量排名前10的商品。在product_sale_counts.py文件中添加如下代码。 1 top_10_products = spark.sql(""" 2 SELECT product_id,'0' AS sale_type,sale_count 3 FROM product_sale_counts 4 ORDER BY sale_count DESC 5 LIMIT 10 6 """) 上述代码通过执行SQL语句,对临时表product_sale_counts中的数据按照字段sale_ count进行降序排序,并获取排序结果的前10条数据,即销量最高的10件商品。最终生 成一个DataFrame对象top_10_products。 (8)获取销量排名后10的商品。在product_sale_counts.py文件中添加如下代码。 1 bottom_10_products = spark.sql(""" 2 SELECT product_id,'1' AS sale_type,sale_count 3 FROM product_sale_counts 4 ORDER BY sale_count ASC 5 LIMIT 10 6 """) 上述代码通过执行SQL语句,对临时表product_sale_counts中的数据按照字段sale_ count进行升序排序,并获取排序结果的前10条数据,即销量最低的10件商品。最终生 成一个DataFrame对象bottom_10_products。 (9)使用union算子合并DataFrame对象top_10_products和bottom_10_products。 在visit_counts.py文件中添加如下代码。 Spark项目实训(Python1 22 版) result = top_10_products.union(bottom_10_products)) 上述代码生成了DataFrame对象result,该对象中包含了网站中畅销品和滞销品的 分析结果。 (10)将DataFrame对象result中的数据以覆盖模式写入表ads_sale_counts_2023。 在visit_counts.py文件中添加如下代码。 1 #指定表ads_sale_counts_2023 在HDFS 存储数据的目录 2 table_location = '/user_behavior/ads/ads_sale_counts_2023' 3 result.write.format("hive").mode("overwrite") \ 4 .option('path', table_location) \ 5 .saveAsTable("user_behavior_db.ads_sale_counts_2023") 3.运行Spark程序 为了在YARN 集群上运行Spark程序,需要将product_sale_counts.py文件上传到 虚拟机Spark02的/export/servers目录中。确保MetaStore服务、HDFS集群和YARN 集群处于启动状态下,将product_sale_counts.py 文件中实现的Spark 程序提交到 YARN 集群运行。在虚拟机Spark02执行如下命令。 spark-submit \ --master yarn \ --deploy-mode cluster \ /export/servers/product_sale_counts.py 上述命令执行完成后可以通过访问YARN WebUI查看Spark程序的运行状态,若 其状态为FINISHED并且最终状态为SUCCEEDED表示运行成功。 需要说明的是,为了优化资源利用,在运行商品分析的Spark程序时,可以选择仅启 动集群环境中的HDFS集群、YARN 集群和MetaStore服务。 4.查看表ads_sale_counts_2023的数据 通过查询表ads_sale_counts_2023的全部数据,获取电商网站中畅销品和滞销品的 信息。在Hive命令行界面执行如下命令。 hive> SELECT * FROM user_behavior_db.ads_sale_counts_2023; 上述命令执行完成的效果如图5-2所示。 在图5-2中,表ads_sale_counts_2023中的每行数据依次记录了商品的唯一标识、销 售类型的标识(0表示畅销品,1表示滞销品)和商品的销量。例如,第一行数据显示唯一 标识为269的商品为畅销品,销量为37。需要说明的是,实际分析结果与读者采集的历 史用户行为数据有关。