学习目标
. 了解用户行为数据,能够说出电商网站中用户行为数据的含义。
. 了解模拟生成用户行为数据,能够实现模拟生成用户行为数据的Python程序。
. 掌握配置采集方案,能够根据需求灵活配置Flume的采集方案。
. 熟悉采集用户行为数据,能够根据采集方案启动Flume采集数据。
数据采集是指通过各种技术手段从不同数据源获取数据的过程。其目标是获得完
整、准确、及时的数据,以支持后续的数据分析和决策。本项目的核心需求是分析电商网
站中的用户行为数据,这些数据通过网站的埋点获取,并以日志的形式存储在服务器上。
本章详细介绍如何通过数据采集来获取用户行为数据。
3.1 用户行为数据概述
用户行为数据是指用户在电商网站上的各种交互记录,包括用户的行为信息及环境
信息。收集这些数据的主要目的是优化产品,并为各项分析统计指标提供数据支持。本
项目采集的用户行为数据主要包括页面信息、设备信息和行为信息。下面以一条用户行
为数据为例进行说明,具体内容如下。 
{ 
"page_info": { 
"page_id": 287, 
"page_url": "https://www.example.com/page_287", 
"product_id": 287, 
"category": "Grocery" 
}, 
"behavior_info": { 
"user_id": 6421, 
"behavior_type": "purchase", 
"action_time": "2023-02-15 05:32:31", 
"location": "湖北, 宜昌" 
},

Spark项目实训(Python76 版) 
"device_info": { 
"operating_system": "Android", 
"access_method": "browser", 
"browser_type": "Opera", 
"app_version": null 
} 
}
从上述内容可以看出,本项目所采集的用户行为数据以对象结构的JSON 格式存储。
其中,键page_info的值以对象结构存储页面信息,键behavior_info的值以对象结构存储
行为信息,键device_info的值以对象结构存储设备信息。有关这些信息的详细说明如
表3-1所示。
表3-1 用户行为数据的详细说明
类 别键描 述
页面信息
page_id 表示用户所访问页面的唯一标识
page_url 表示用户所访问页面的URL地址
product_id 表示商品的唯一标识
category 表示商品所属的品类
行为信息
user_id 表示用户的唯一标识
behavior_type 表示用户的行为类型,其值包括click(点击)、cart(加入购物车)和
purchase(购买) 
action_time 表示用户触发行为的时间
location 表示用户触发行为的地理位置
设备信息
operating_system 表示用户使用的操作系统
access_method 表示用户的访问方式,其值包括app和browser(浏览器) 
browser_type 表示浏览器类型,当用户的访问方式为app时,浏览器类型的值
为null 
app_version 表示App的版本号,当用户的访问方式为browser时,App的版本
号为null 
3.2 模拟生成用户行为数据
本项目通过编写Python程序模拟生成用户行为数据。由于离线分析和实时分析所
用的用户行为数据分别来自历史数据和实时数据,它们在用户触发行为的时间上会有差
异,所以需要编写两个Python程序,以便生成两种不同类型的用户行为数据。本节讲解
如何通过编写Python程序模拟生成用户行为数据。

第3章数据采集77 

3.1 
生成历史用户行为数据
2.
在本项目中,需要生成一年的历史用户行为数据,时间范围是2023 年1月1日至
2023 年12 月31 日。接下来演示如何使用集成开发工具PyCharm 编写Python程序,实
现生成历史用户行为数据的功能,具体操作步骤如下。

1. 
创建项目
在PyCharm 中基于自定义环境创建名为spark_project的项目,并指定项目使用本
地安装的Pythn3.13 版本的Pyton解释器,如图31所示。

o9.h


图3-
1 
创建项目

在图3-1中,单击Create按钮创建项目spark_project。

2. 
创建目录
在项目spark_project中创建名为data的目录,用于存放生成用户行为数据的
Python文件,如图3-2所示。

3. 
创建Python文件
在项目spark_project的data目录中创建名为
generate_user_data_history的Python文件,用于实现
生成历史用户行为数据的Python程序。

4. 
实现Python程序
enerateuserdatahistory.图3-
2 
创建目录
在g___py 文件中,添加用



Spark项目实训(Python78 版) 
于生成历史用户行为数据的相关模块和代码,具体操作步骤如下。
(1)在generate_user_data_history.py文件中导入json、random、datetime和time模
块,具体代码如文件3-1所示。
文件3-1 generate_user_data_history.py 
1 #用于处理JSON 格式的数据
2 import json 
3 #用于生成随机数
4 import random 
5 #用于处理日期和时间
6 import datetime 
7 #用于提供与时间相关的函数
8 import time 
(2)在文件3-1中添加名为random_date的函数,用于生成一个在指定时间范围内
随机的时间作为用户触发行为的时间,具体代码如下。 
1 def random_date(start, end): 
2 return start + datetime.timedelta( 
3 seconds=random.randint(0, int((end - start).total_seconds())), 
4 ) 
上述代码中,函数random_date()接收两个参数start和end,分别用于指定时间范围
的起始和结束时间。
(3)在文件3-1中添加名为random_location的函数,用于生成用户触发行为的地理
位置,具体代码如下。 
1 def random_location(): 
2 locations = { 
3 "北京": ["北京"], 
4 "上海": ["上海"], 
5 "广东": ["广州", "深圳", "东莞", "珠海"], 
6 "浙江": ["杭州", "宁波", "温州", "嘉兴", "湖州"], 
7 "江苏": ["南京", "苏州", "无锡", "常州", "扬州"], 
8 "四川": ["成都", "绵阳", "德阳", "南充", "宜宾"], 
9 "湖北": ["武汉", "黄石", "宜昌", "襄阳", "荆州"], 
10 "山东": ["济南", "青岛", "烟台", "潍坊", "淄博"], 
11 "河南": ["郑州", "洛阳", "开封", "新乡", "安阳"], 
12 "河北": ["石家庄", "唐山", "邯郸", "张家口"], 
13 "湖南": ["长沙", "株洲", "湘潭", "衡阳", "岳阳"] 
14 } 
15 province = random.choice(list(locations.keys())) 
16 city = random.choice(locations[province])

第3章 数据采集 79 
17 return f"{province}, {city}" 
上述代码中,第2~14行代码定义了一个字典locations,其中包含省份及其对应城市
的信息。每个省份作为键,对应一个包含该省份城市的列表。第15行代码用于随机选择
一个省份。第16行代码则基于已选择的省份,随机选择该省份的一个城市。
(4)在文件3-1中添加名为generate_page_info的函数,用于生成页面信息,具体代
码如下。 
1 def generate_page_info(): 
2 product_categories = { 
3 range(1, 31): "Electronics", 
4 range(31, 61): "Clothing", 
5 range(61, 91): "Books", 
6 range(91, 121): "Home", 
7 range(121, 151): "Toys", 
8 range(151, 181): "Sports", 
9 range(181, 211): "Beauty", 
10 range(211, 241): "Health", 
11 range(241, 271): "Automotive", 
12 range(271, 301): "Grocery" 
13 } 
14 product_id = random.randint(1, 300) 
15 category = next( 
16 ( 
17 cat for range_, 
18 cat in product_categories.items() if product_id in range_ 
19 ) 
20 ) 
21 page_info = { 
22 "page_id": product_id, 
23 "page_url": f"https://www.example.com/page_{product_id}", 
24 "product_id": product_id, 
25 "category": category 
26 } 
27 return page_info 
上述代码中,第2~13行代码使用字典product_categories来定义不同范围商品对应
的品类。第14行代码生成了一个在指定范围内的随机整数,作为商品的唯一标识,同时
也作为页面的唯一标识,以确保每个页面只对应一个商品。第15~20行代码用于根据生
成的商品的唯一标识确定其所属的品类。第21~26行代码定义了一个包含页面信息的
字典page_info,该字典的第一个键值对表示用户所访问页面的唯一标识,第二个键值对
表示用户所访问页面的URL地址,第三个键值对表示商品的唯一标识,第四个键值对表

Spark项目实训(Python80 版) 
示商品所属品类。
(5)在文件3-1中添加名为generate_device_info的函数,用于生成设备信息,具体代
码如下。 
1 def generate_device_info(): 
2 access_method = random.choice(["browser", "app"]) 
3 device_info = { 
4 "operating_system": random.choice( 
5 ["Windows", "macOS", "Android", "iOS"] 
6 ), 
7 "access_method": access_method 
8 } 
9 if access_method == "browser": 
10 device_info["browser_type"] = random.choice( 
11 ["Chrome", "Firefox", "Safari", "Edge", "Opera"] 
12 ) 
13 device_info["app_version"] = None 
14 elif access_method == "app": 
15 device_info["browser_type"] = None 
16 device_info["app_version"] = (f"{random.randint(8, 10)}." 
17 f"{random.randint(0, 9)}." 
18 f"{random.randint(0, 9)}") 
19 return device_info 
上述代码中,第2行代码用于随机选择一个用户的访问方式。第3~8行代码定义了
一个包含设备信息的字典device_info,该字典的第一个键值对表示用户使用的操作系统, 
第二个键值对表示用户的访问方式。
第9~18行代码用于通过判断用户的访问方式,向字典device_info中添加两个键值
对,它们的键分别为browser_type和app_version,表示浏览器类型和App版本号。当用
户的访问方式为browser时,键app_version的值为None,表示没有App版本号的信息。
当用户的访问方式为app时,键browser_type的值为None,表示没有浏览器类型的
信息。
(6)在文件3-1中添加名为generate_behavior_info的函数,用于生成行为信息,具体
代码如下。 
1 def generate_behavior_info(): 
2 start_date = datetime.datetime(2023, 1, 1) 
3 end_date = datetime.datetime(2023, 12, 31,23,59,59) 
4 behavior_info = { 
5 "user_id": random.randint(1, 10000), 
6 "behavior_type": random.choice(["click", "cart", "purchase"]), 
7 "action_time": str(random_date(start_date, end_date)),

第3章 数据采集 81 
8 "location": random_location() 
9 } 
10 return behavior_info 
上述代码中,第2行代码用于指定时间范围的起始时间为2023-01-0100:00:00。第
3行代码用于指定时间范围的结束时间为2023-12-3123:59:59。第4~9行代码定义了
一个包含行为信息的字典behavior_info,该字典的第一个键值对表示用户的唯一标识,第
二个键值对表示用户的行为类型,第三个键值对表示用户触发行为的时间,第四个键值对
表示用户触发行为的地理位置。
(7)在文件3-1中添加名为generate_user_behavior的函数,用于整合页面信息、行
为信息和设备信息,从而生成完整的用户行为数据,具体代码如下。 
1 def generate_user_behavior(): 
2 page_info = generate_page_info() 
3 behavior_info = generate_behavior_info() 
4 device_info = generate_device_info() 
5 user_behavior = { 
6 "page_info": page_info, 
7 "behavior_info": behavior_info, 
8 "device_info": device_info 
9 } 
10 return user_behavior 
上述代码中,第2~4行代码分别用于获取页面信息、行为信息和设备信息。第5~9 
行代码定义了一个包含用户行为数据的字典user_behavior,该字典的第一个键值对表示
页面信息,第二个键值对表示行为信息,第三个键值对表示设备信息。
(8)在文件3-1中添加名为output_user_behaviors的函数,用于将生成的用户行为
数据写入日志文件,具体代码如下。 
1 def output_user_behaviors(interval, output_file): 
2 try: 
3 with open(output_file, 'a', encoding='utf-8') as f: 
4 while True: 
5 #生成用户行为数据
6 user_behavior = generate_user_behavior() 
7 #将用户行为数据转换为JSON 格式
8 user_behavior_json = json.dumps(user_behavior,ensure_ascii=False) 
9 f.write(user_behavior_json + "\n") 
10 f.flush() 
11 time.sleep(interval) 
12 except KeyboardInterrupt: 
13 print("Data generation stopped.")

Spark项目实训(Python82 版) 
上述代码中,函数output_user_behaviors()接收两个参数interval和output_file,分
别用于指定生成每条用户行为数据的时间间隔(秒),以及日志文件所在目录和名称。
(9)在文件3-1中调用函数output_user_behaviors(),指定生成每条用户行为数据的
时间间隔为0.5秒,日志文件所在目录为/export/data/log/2023,日志文件名称为user_ 
behaviors.log,具体代码如下。 
output_user_behaviors(0.5,"/export/data/log/2023/user_behaviors.log") 
需要说明的是,由于本项目将使用虚拟机Spark03运行Python程序,因此上述代码
指定的目录为Linux操作系统的格式。
5.创建目录
在虚拟机Spark03中创建用于存储日志文件user_behaviors.log的目录/export/ 
data/log/2023,具体命令如下。 
mkdir -p /export/data/log/2023 
3.2.2 生成实时用户行为数据
在本项目中,生成实时和历史用户行为数据的Python程序基本一致,不同之处在
于,实时用户行为数据使用系统当前时间来生成用户触发行为的时间。因此,可以参考
generate_user_data_history.py文件中的代码,来编写生成实时用户行为数据的Python 
程序,操作步骤如下。
1.创建Python文件
在项目spark_project的data目录中创建名为generate_user_data_real的Python文
件,用于实现生成实时用户行为数据的Python程序。
2.实现Python程序
在generate_user_data_real.py文件中,添加用于生成实时用户行为数据的相关模块
和代码,具体操作步骤如下。
(1)将generate_user_data_history.py文件中的内容复制到generate_user_data_ 
real.py文件中。
(2)在generate_user_data_real.py文件中,将导入datetime模块的代码替换为如下
代码。 
from datetime import datetime 
(3)在generate_user_data_real.py文件中导入os模块用于与操作系统交互,具体代
码如下。 
import os 
(4)在generate_user_data_real.py文件中,删除名为random_date的函数。

第3章 数据采集 83 
(5)在generate_user_data_real.py文件中,修改名为generate_behavior_info的函
数,该函数修改完成的内容如下。 
1 def generate_behavior_info(): 
2 current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') 
3 behavior_info = { 
4 "user_id": random.randint(1, 10000), 
5 "behavior_type": random.choice(["click", "cart", "purchase"]), 
6 "action_time": current_time, 
7 "location": random_location() 
8 } 
9 return behavior_info 
上述代码中,第2 行代码用于获取当前系统时间,并将其转换为YYYY-mm-dd 
HH:mm:ss(年-月-日时:分:秒)格式的字符串。
(6)在generate_user_data_real.py文件中,将调用函数output_user_behaviors()的
代码修改为如下内容。 
1 #根据当前系统时间获取年份
2 current_year = datetime.now().year 
3 #指定日志文件所在目录
4 directory_template = "/export/data/log/{year}" 
5 #定义日志文件的名称
6 file_name = "user_behaviors.log" 
7 #将指定目录中的占位符替换为获取的年份
8 directory = directory_template.format(year=current_year) 
9 #判断目录是否存在,若不存在,则创建该目录
10 os.makedirs(directory, exist_ok=True) 
11 #将目录和日志文件的名称合并成一个完整的文件路径
12 output_file = os.path.join(directory, file_name) 
13 output_user_behaviors(0.5,output_file) 
上述代码通过调用函数output_user_behaviors()将用户行为数据写入指定目录的日
志文件user_behaviors.log。其中,/export/data/log目录的子目录会根据当前系统时间
中的年份自动生成。
3.3 配置采集方案
本项目需要在虚拟机Spark03上启动两个FlumeAgent,分别负责采集历史和实时
用户行为数据。因此,需要为这两个FlumeAgent配置不同的采集方案,以适应不同的
数据采集需求,具体实现过程如下。
1.配置采集历史用户行为数据的方案
采集历史用户行为数据的FlumeAgent在启动时,会对数据进行JSON 格式校验,

Spark项目实训(Python84 版) 
以确保后续数据分析和存储过程中使用的数据符合JSON 格式要求。校验通过后, 
FlumeAgent将根据用户行为触发时间的日期,将历史用户行为数据发送到HDFS的不
同目录中。
采集历史用户行为数据的FlumeAgent主要包括Source、Channel和Sink三个组
件。其中,Source 组件的类型为TaildirSource,负责监控和读取日志文件user_ 
behaviors.log中的历史用户行为数据。Channel组件的类型为FileChannel,负责将
Flume中的事件持久化到磁盘上。Sink组件的类型为HDFSSink,负责将历史用户行为
数据输出到HDFS的指定目录。
接下来演示如何在虚拟机Spark03中配置采集历史用户行为数据的方案,操作步骤
如下。
(1)在虚拟机Spark03中创建/export/data/flume_conf目录,用于存放采集方案的
配置文件,具体命令如下。 
mkdir /export/data/flume_conf 
(2)在虚拟机Spark03的/export/data/flume_conf目录中,使用vi编辑器编辑配置
文件flume-logs-history.conf,在该文件中添加采集方案,具体内容如文件3-2所示。
文件3-2 flume-logs-history.conf 
1 #定义Source 组件的标识r1 
2 a1.sources = r1 
3 #定义Channel 组件的标识c1 
4 a1.channels = c1 
5 #定义Sink 组件的标识k1 
6 a1.sinks = k1 
7 #定义Source 组件的类型为Taildir Source 
8 a1.sources.r1.type = TAILDIR 
9 #定义用于记录被监控文件当前读取位置的文件taildir_position_history.json 
10 a1.sources.r1.positionFile = /export/data/flume/taildir_position_ 
history.json 
11 #定义文件组的标识为f1 
12 a1.sources.r1.filegroups = f1 
13 #定义文件组f1 中被监控文件的位置,即日志文件user_behaviors.log 所在目录
14 a1.sources.r1.filegroups.f1 = /export/data/log/2023/user_behaviors.log 
15 #定义Source 组件中拦截器的标识i1 
16 a1.sources.r1.interceptors = i1 
17 #在标识为i1 的拦截器中添加一个自定义拦截器,用于校验数据是否为JSON 格式并将用
#户触发行为的时间转换为时间戳格式之后添加到事件的header 中
18 a1.sources.r1.interceptors.i1.type = cn.itcast.flume 
.JsonAndTimestampInterceptor$Builder 
19 #定义Channel 组件的类型为File Channel 
20 a1.channels.c1.type = file

第3章 数据采集 85 
21 #定义File Channel 存储元数据的目录
22 a1.channels.c1.checkpointDir = /export/data/flume/checkpoint 
23 #定义File Channel 存储事件的目录
24 a1.channels.c1.dataDirs = /export/data/flume/data 
25 #定义Sink 组件的类型为HDFS Sink 
26 a1.sinks.k1.type = hdfs 
27 #定义HDFS Sink 将数据输出到指定目录的文件中,其中%Y-%m-%d 表示根据日期创建目
#录,如2023-11-02 
28 a1.sinks.k1.hdfs.path = /origin_data/log/user_behaviors/%Y-%m-%d 
29 #定义文件的前缀为log 
30 a1.sinks.k1.hdfs.filePrefix = log 
31 #定义滚动新文件的时间间隔为10 秒
32 a1.sinks.k1.hdfs.rollInterval = 10 
33 #定义滚动新文件的大小为0,表示不根据文件大小滚动新文件
34 a1.sinks.k1.hdfs.rollSize = 0 
35 #定义滚动新文件的事件数为0,表示不根据事件数滚动新文件
36 a1.sinks.k1.hdfs.rollCount = 0 
37 #定义文件的类型为压缩文件,以减少存储空间和提高传输效率
38 a1.sinks.k1.hdfs.fileType = CompressedStream 
39 #定义压缩文件的压缩编解码器为GZIP 
40 a1.sinks.k1.hdfs.codeC = gzip 
41 #将Source 组件与Channel 组件关联
42 a1.sources.r1.channels = c1 
43 #将Sink 组件与Channel 组件关联
44 a1.sinks.k1.channel = c1 
在文件3-2中,指定FlumeAgent的标识为a1,其中第28行代码依据的日期来源于
每条用户行为数据中用户触发行为的时间。第18行代码添加的自定义拦截器需要通过
编写Java程序来实现,其具体实现过程本书不作重点讲解。在本书的配套资源中提供了
自定义拦截器的jar文件FlumeInterceptor.jar,供读者直接使用。第40行代码使用压缩
编解码器GZIP是因为其具有较高的压缩率,可以最大程度减少存储空间。
在文件3-2中添加采集方案后,保存并退出编辑。
2.配置采集实时用户行为数据的方案
采集实时用户行为数据的FlumeAgent在启动时,会对数据进行JSON 格式校验, 
以确保后续数据分析和存储过程中使用的数据符合JSON 格式要求。校验通过后, 
FlumeAgent将实时用户行为数据发送到Kafka。
采集实时用户行为数据的FlumeAgent主要包括Source和Channel两个组件。其
中,Source组件的类型为TaildirSource,负责监控和读取日志文件user_behaviors.log中
的用户行为数据。Channel组件的类型为KafkaChannel,用于将用户行为数据传输到
Kafka。
在虚拟机Spark03的/export/data/flume_conf目录中,使用vi编辑器编辑配置文件

Spark项目实训(Python86 版) 
flume-logs-real.conf,在该文件中添加采集实时用户行为数据的方案,具体内容如文件3-3 
所示。
文件3-3 flume-logs-real.conf 
1 #定义Source 组件的标识r1 
2 a2.sources = r1 
3 #定义Channel 组件的标识c1 
4 a2.channels = c1 
5 #定义Source 组件的类型为Taildir Source 
6 a2.sources.r1.type = TAILDIR 
7 #定义用于记录被监控文件当前读取位置的文件taildir_position_real.json 
8 a2.sources.r1.positionFile = /export/data/flume/taildir_position_real 
.json 
9 #定义文件组的标识为f1 
10 a2.sources.r1.filegroups = f1 
11 #定义文件组f1 中被监控文件的位置,即日志文件user_behaviors.log 所在目录
12 a2.sources.r1.filegroups.f1 = /export/data/log/2024/user_behaviors.log 
13 #定义Source 组件中拦截器的标识i1 
14 a2.sources.r1.interceptors = i1 
15 #在标识为i1 的拦截器中添加一个自定义拦截器,用于校验数据是否为JSON 格式
16 a2.sources.r1.interceptors.i1.type = cn.itcast.flume 
.JsonValidationInterceptor$Builder 
17 #定义Channel 组件的类型为Kafka Channel 
18 a2.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel 
19 #定义Kafka 集群的地址
20 a2.channels.c1.kafka.bootstrap.servers = spark01:9092,spark02:9092,spark03: 
9092 
21 #定义Kakfa 的主题user_behavior_topic 
22 a2.channels.c1.kafka.topic = user_behavior_topic 
23 #关闭Flume 对数据的事件解析功能
24 a2.channels.c1.parseAsFlumeEvent = false 
25 #将Channel 组件与Source 组件关联
26 a2.sources.r1.channels = c1 
在文件3-3中指定FlumeAgent的标识为a2。第16行代码添加的自定义拦截器需
要通过编写Java程序来实现,其具体实现过程本书不作重点讲解。在本书的配套资源中
提供了自定义拦截器的jar文件FlumeInterceptor.jar,供读者直接使用。
在文件3-3中添加采集方案后,保存并退出编辑。需要注意的是,文件3-3中日志文
件user_behaviors.log所在目录需要根据generate_user_data_real.py文件运行时实际生
成的目录进行修改。
3.添加自定义拦截器
参考第2章上传JDK安装包的方式,将jar文件FlumeInterceptor.jar.jar上传到虚
拟机Spark03的/export/servers/flume-1.10.1/lib目录中,从而在Flume中添加自定义

第3章 数据采集 87 
拦截器。
3.4 采集用户行为数据
本节讲解如何使用3.3节配置的采集方案,分别启动负责采集历史和实时用户行为
数据FlumeAgent,以完成本项目中采集用户行为数据的功能,具体内容如下。
1.启动采集历史用户行为数据的FlumeAgent 
在虚拟机Spark03中启动FlumeAgent的操作步骤如下。
(1)启动HDFS集群。确保虚拟机Spark01、Spark02和Spark03中HDFS集群的相
关进程正常启动。需要说明的是,为了优化资源利用,在采集历史用户行为数据时,可以
选择仅启动集群环境中的HDFS集群。
(2)参考第2 章上传JDK 安装包的方式,将Python 文件generate_user_data_ 
history.py上传到虚拟机Spark03的/export/servers目录中。
(3)在虚拟机Spark03中启动FlumeAgent,从/export/data/log/2023目录中的日
志文件user_behaviors.log里采集历史用户行为数据,具体命令如下。 
flume-ng agent --name a1 --conf conf/ --conf-file \ 
/export/data/flume_conf/flume-logs-history.conf \ 
-Dflume.root.logger=INFO,console 
上述命令中,参数--name指定的参数值a1为FlumeAgent的标识,该标识需要与配
置文件flume-logs-history.conf中FlumeAgent的标识一致。
上述命令执行完成后,FlumeAgent会占用Tabby中虚拟机Spark03的操作窗口, 
因此用户无法进行其他操作。读者可以在Tabby中右击虚拟机Spark03的操作窗口,在
弹出的菜单中选择“克隆”选项,通过克隆的方式创建一个虚拟机Spark03的新操作窗口, 
如图3-3所示。
图3-3 创建一个虚拟机Spark03的新操作窗口
(4)在Tabby中创建一个虚拟机Spark03的新操作窗口,用于执行Python文件

Spark项目实训(Python88 版) 
generate_user_data_history.py,向/export/data/log/2023 目录的日志文件user_ 
behaviors.log中写入历史用户行为数据,具体命令如下。 
python /export/servers/generate_user_data_history.py 
上述命令执行完成后,Python程序会占用Tabby中虚拟机Spark03的操作窗口,因
此用户无法进行其他操作。
(5)在HDFS的/origin_data/log/user_behaviors目录中,检查采集的历史用户行为
数据是否根据用户触发行为时间中的日期,正确分配到HDFS的不同目录中。在虚拟机
Spark01执行如下命令。 
hdfs dfs -ls /origin_data/log/user_behaviors 
上述命令执行完成的效果如图3-4所示。
图3-4 查看/origin_data/log/user_behaviors目录中的内容
图3-4展示了/origin_data/log/user_behaviors目录的部分内容。该目录下有许多按
照日期命名的子目录,每个子目录中包含了对应日期的历史用户行为数据。
(6)查看/origin_data/log/user_behaviors目录中任意子目录包含的文件。这里以
/origin_data/log/user_behaviors/2023-04-29目录为例,在虚拟机Spark01执行如下命令。 
hdfs dfs -ls /origin_data/log/user_behaviors/2023-04-29 
上述命令执行完成的效果如图3-5所示。
图3-5 查看/origin_data/log/user_behaviors/2023-04-29目录的内容

第3章 数据采集 89 
从图3-5可以看出,/origin_data/log/user_behaviors/2023-04-29目录下有3个压缩
文件(.gz),这些压缩文件存储了2023年4月29日的用户行为数据。需要说明的是,读
者在实际操作时,图3-5显示的文件名会与此不同。
(7)查看/origin_data/log/user_behaviors/2023-04-29目录中任意压缩文件包含的
内容。这里以log.1720173246809.gz文件为例,在虚拟机Spark01执行如下命令。 
hdfs dfs -text \ 
/origin_data/log/user_behaviors/2023-04-29/log.1720173246809.gz 
上述命令执行完成的效果如图3-6所示。
图3-6 查看log.1720173246809.gz文件的内容
从图3-6可以看出,log.1720173246809.gz文件中包含一条用户行为数据,该数据中
用户触发行为的时间为2023年4月29日。因此说明,采集的用户行为数据根据用户触
发行为时间中的日期,正确分配到HDFS的不同目录中。
小提示:虚拟机Spark03中FlumeAgent和Python程序的运行时长决定了生成历
史用户行为数据的数量。建议读者在虚拟机Spark03中运行FlumeAgent和Python程
序较长时间,以生成更多的历史用户行为数据,从而使后续的数据分析结果更加丰富。
2.启动采集实时用户行为数据的FlumeAgent 
在虚拟机Spark03中启动FlumeAgent的操作步骤如下。
(1)启动HDFS集群、ZooKeeper集群和Kafka集群,确保虚拟机Spark01、Spark02 
和Spark03中这些集群的相关进程正常启动。需要说明的是,为了优化资源利用,在采集
实时用户行为数据时,可以不启动集群环境中的YARN 集群、Doris集群和Hive的相关
服务。
(2)参考第2章上传JDK安装包的方式,将Python文件generate_user_data_real.py 
上传到虚拟机Spark03的/export/servers目录中。
(3)在Kafka中创建主题user_behavior_topic。在虚拟机Spark01执行如下命令。 
kafka-topics.sh --create --topic user_behavior_topic \ 
--partitions 3 --replication-factor 2 \ 
--bootstrap-server spark01:9092,spark02:9092,spark03:9092 
通过上述命令在Kafka中创建的主题user_behavior_topic包含3个分区和2个副
本,以提升处理效率和数据的容错性。上述命令创建的主题需要与配置文件flume-logs

Spark项目实训(Python90 版) 
real.conf中指定的Kafka主题一致。
上述命令执行完成后,若出现“Createdtopicuser_behavior_topic”的提示信息,说明
在Kafka中成功创建主题user_behavior_topic。
(4)在Tabby中创建一个虚拟机Spark03的新操作窗口,用于执行Python文件
generate_user_data_real.py,向/export/data/log/2024目录的日志文件user_behaviors 
.log中写入实时用户行为数据,具体命令如下。 
python /export/servers/generate_user_data_real.py 
(5)在Tabby中创建一个虚拟机Spark03的新操作窗口,用于启动FlumeAgent, 
从/export/data/log/2024目录中的日志文件user_behaviors.log里采集实时用户行为数
据,具体命令如下。 
flume-ng agent --name a2 --conf conf/ --conf-file \ 
/export/data/flume_conf/flume-logs-real.conf \ 
-Dflume.root.logger=INFO,console 
上述命令中,参数--name指定的参数值a2为FlumeAgent的标识,该标识需要与配
置文件flume-logs-real.conf中FlumeAgent的标识一致。
在虚拟机Spark01中启动一个Kafka消费者,该消费者订阅主题user_behavior_ 
topic,用于验证Flume是否将采集的用户行为数据写入Kafka的主题user_behavior_ 
topic中,具体命令如下。 
kafka-console-consumer.sh --topic user_behavior_topic \ 
--group user_behavior_test \ 
--bootstrap-server spark01:9092,spark02:9092,spark03:9092 
上述命令执行完成后的效果如图3-7所示。
图3-7 Kafka消费者
从图3-7可以看出,Kafka消费者输出了生成的用户行为数据,说明Flume成功将采
集的用户行为数据写入Kafka的主题user_behavior_topic中。
小提示:在确认Flume 成功将采集的用户行为数据写入Kafka 的主题user_ 
behavior_topic后,读者可以暂时关闭生成实时用户行为数据的Python程序,以及负责采
集实时用户行为数据的FlumeAgent。待后续进行实时分析时,再重新启动它们。

第3章 数据采集 91 
在关闭FlumeAgent、Python程序或者Kafka消费者时,可以在相应的操作窗口中, 
通过组合键Ctrl+ C实现。
脚下留心:调整FlumeAgent可使用JVM 的最大内存
当FlumeAgent启动后,出现OutOfMemoryError的错误信息时,通常是由于采集
的数据量较大,导致FlumeAgent可使用JVM 的内存不够用所导致。读者可以通过修
改配置文件flume-env.sh,调整FlumeAgent启动和运行时可使用JVM 的最大内存,具
体操作步骤如下。
(1)通过复制模板文件flume-env.sh.template创建配置文件flume-env.sh。在
Flume安装目录的/conf目录中执行如下命令。 
cp flume-env.sh.template flume-env.sh 
(2)使用vi编辑器编辑配置文件flume-env.sh,在文件的末尾添加如下内容。 
#根据实际情况填写JDK 安装目录
export JAVA_HOME=/export/servers/jdk1.8.0_401/ 
export JAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote" 
上述内容指定FlumeAgent启动和运行时可使用JVM 的最大内存分别为1GB 
(-Xms1024m)和2GB(-Xmx2048m)。配置文件flume-env.sh的内容修改完成后,保存并
退出编辑。
3.5 本章小结
本章主要讲解了数据采集的相关内容。首先,介绍了用户行为数据的概念。然后,介
绍了模拟生成用户行为数据。最后,分别介绍了采集方案的配置以及如何采集用户行为
数据。通过本章的学习,读者应可以掌握项目中数据采集的实现,为后续实施数据分析提
供数据支撑。