大数据ETL 实现 
学习计划: 
. 了解Spark的基本原理和性质
. 掌握Spark的安装装置和ETL实现
. 了解Spark的交互模式
. 掌握Hadoop平台的搭建
. 了解Hive的安装方法和基本性质
. 了解Sqoop的基本应用
在大数据时代,人们需要处理和使用的数据越来越多。对于企业来说,数据已经成为
企业的生存基础,能否利用好自己的数据对企业的发展至关重要。数据库技术为企业分
析海量数据提供了有效方案,而在数据仓库的构建过程中,ETL往往是整个过程中最耗
时和复杂的阶段。日益增长的数据处理量对ETL技术提出了更高的性能要求,也带来了
更大的挑战。
5.1 Spark 的分布式ETL 实现
为了应对海量数据的ETL处理需求,用分布式并行技术实现ETL很有必要。尽管
当前基于MapReduce(Hadoop的一个子项目)范型实现的分布式ETL方案能够实现海
量数据的高效处理,但是由于MapReduce编程模型的限制,即MapReduce只有两种处理
方式,以及多步的处理过程中存在的高I/O(输入/输出)开销,使其在ETL的转换过程中
存在一些性能问题,在处理效率和处理速度方面还有许多优化空间。针对大数据的“海
量”特征,以及基于MapReduce范型实现的分布式ETL方案的局限性,结合数据仓库理
论知识和分布式处理技术,基于Spark对分布式并行ETL技术进行研究,近年来又提出
了一种分布式ETL的设计方案,重点研究数据转换过程中转换处理的并行实现,并根据
不同的转换处理类型给出了适用的解决方法。针对前期非聚集操作,如基本的数据清洗、
数据格式标准化操作,提出了基于分区的并行管道处理算法,以分区为单位处理数据,从
而提高数据转换效率;对于聚集操作,如事实表的数值数据的聚合操作,采用了分区预聚
合方法,以减小数据传输频率。结果显示,提出的方法能够明显加速大数据量的转换处
理,进而提高分布式ETL的性能和处理效率。本章对基于Spark的数据处理流程进行了
5

性能优化讲解。详细分析了Spark在处理中的常见数据倾斜问题,根据不同场景下的数
据倾斜情况,分别给出了对应的并行优化策略。最后,通过开发一个实际的决策支持系
统,阐述了基于Spark的分布式ETL 的设计与应用情况,包括与传统ETL 开发方案的比
较分析,分析结果证明了基于Spark的分布式ETL 方案的有效性和高可扩展性。

5.1.1 
Spark概述
ApacheSpark是专为大规模数据处理而设计的快速、通用的计算引擎。Spark是加
州大学伯克利分校的AMP 实验室开源的类Hadoop与MapReduce的通用并行框架, 
Spak拥有Hadoeue的优点, eue的是,o

ropMapRdc但不同于MapRdcJb的中间输出结果
可以保存在内存中,从而不再需要读写HDFS,因此,Spark能更好地适用于数据挖掘与
机器学习等需要迭代的MapReduce的算法。

Spark是一种与Hadoop相似的开源集群计算环境,但是两者之间还存在一些不同, 
这些不同使Spark在某些工作负载方面表现得更加优越。换句话说,Spark启用了内存
分布数据集,除了能够提供交互式查询外,还可以优化迭代工作负载。

Spark是使用Scala语言实现的,它将Scala用作其应用程序框架。与Hadoop不同, 
Spark和Scala能够紧密集成,Scala可以像操作本地集合对象一样轻松地操作分布式数
据集。

尽管创建Spark是为了支持分布式数据集上的迭代作业,但是实际上它是对Hadoop
的补充,可以在Hadoop文件系统中并行运行。通过名为Mesos的第三方集群框架可以
支持此行为。

5.1.2 
Spark数据模型———RDD 
弹性分布数据集(ResilientDistributedDataset,RDD)是Spark的最基本抽象,是对
分布式内存的抽象使用,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。
RDD 是Spark最核心的部分,它表示已被分区,不可变的并能够被并行操作的数据集合, 
不同的数据集格式对应不同的RDD 实现。RDD 必须是可序列化的。RDD 可以cache到
内存中,每次对RDD 数据集操作之后的结果,都可以存放到内存中,下一个操作可以直
接从内存中输入,省去了MapReduce的大量磁盘I/O操作。这对于迭代运算比较常见的
机器学习算法和交互式数据挖掘来说,效率提升比较大。

可以将RDD 理解为一个大的集合,将所有数据都加载到内存中,方便多次重用。第
一,它是分布式的,可以分布在多台机器上进行计算;第二,它是弹性的,在计算处理过程
中,机器的内存不够时,它会和硬盘进行数据交换,虽然从某种程度上会降低性能,但是可
以确保计算得以继续进行。

RDD 是分布式只读且已分区的集合对象。这些集合是弹性的,如果一部分数据集丢
失,则可以对它们进行重建,具有自动容错、位置感知调度和可伸缩性,而容错性是最难实
现的。大多数分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。对于
大规模数据分析系统,数据检查点操作成本很高,主要原因是大规模数据在服务器之间传
输带来的各方面问题,相比记录数据的更新,RDD 也只支持粗粒度的转换,也就是记录如

140 


1 41 
何从其他RDD转换而来(即Lineage),以便恢复丢失的分区。
RDD的特性为:数据存储结构不可变;支持跨集群的分布式数据操作;可对数据记
录按key分区;提供了粗粒度的转换操作;数据存储在内存中,保证了低延迟性。
5.1.3 Spark的安装配置
Spark安装可以分为三步:安装JDK和Spark、配置Spark和启动Spark。
1. 安装JDK 和Spark 
在安装JDK和Spark时,需要部署虚拟机、下载Spark和JDK安装包,详见如下步骤。
(1)机器部署:准备一台以Linux为操作系统的服务器,安装好JDK1.7。
(2)如图5-1所示,下载Spark安装包。
图5-1 下载Spark安装包
(3)上传解压安装包。上传spark-1.5.2-bin-hadoop2.6.tgz安装包到Linux上。
(4)解压安装包到指定位置。打开Linux系统命令行,输入“tar-zxvfspark-1.5.2- 
bin-hadoop2.6.tgz-C/home/hadoop/app”。
2. 配置Spark 
配置Spark需要修改“spark-env.sh.template”“spark-env.sh”和“slaves.template”文
件,详细步骤如下。
(1)进入Spark安装目录。 
cd /home/hadoop/app/spark-1.5.2-bin-hadoop2.6 
进入conf目录重命名并修改“spark-env.sh.template”文件。 
cd conf/ 
mv spark-env.sh.template spark-env.sh 
(2)在“vispark-env.sh”配置文件中添加如下配置。 
export JAVA_HOME=/home/hadoop/app/jdk1.7.0_65 
export SPARK_MASTER_IP=weekend110 
export SPARK_MASTER_PORT=7077

1 42 
(3)保存并退出。
(4)重命名并修改“slaves.template”文件。 
mv slaves.template slaves 
(5)在“vislaves”文件中添加子节点所在的位置(Worker节点)weekend110。
(6)保存并退出。
(7)Spark集群配置完毕,目前是1个Master和1个Worker,在weekend110上启动
Spark集群。 
/home/hadoop/app/spark-1.5.2-bin-hadoop2.6/sbin/start-all.sh 
(8)启动后执行“jps”命令,主节点上有Master进程,其他子节点上有Worker进程, 
如图5-2所示,登录Spark管理界面查看集群状态(主节点)。
图5-2 登录Spark 
至此,单节点Spark集群安装完毕,但是Master节点存在单点故障,要解决此问题, 
需借助ZooKeeper,并且至少启动两个Master节点来实现高度可行,配置方式比较简单, 
如下所示。
Spark集群规划:node1、node2是Master;node3、node4、node5是Worker。
(1)在node1节点上修改slaves配置文件内容,指定Worker节点。
(2)在node1上执行sbin/start-all.sh脚本,然后在node2上执行sbin/start-master. 
sh,启动第二个Master。
3. 执行第一个Spark 程序 
spark- submit - - class org.apache.spark.examples.SparkPi - - master spark:// 
weekend110:7077 - - executor - memory 500m - - total - executor - cores 2/home/ 
hadoop/app/park-1.6.2-bin-hadoop2.6/lib/spark- examples- 1.6.2- hadoop2.6.0. 
jar 10 
该算法是利用蒙特卡罗算法求PI,参数说明如下。
(1)“--masterspark://weekend200:7077”指定了Master的地址。

(2)“指定了每个Wor可用内存为1GB 。
-
executor-memory1G” rke
(3)“
”


-
total-executor-cores2指定了整个集群使用的CUP核数为2。
spark-shel 
是Spark自带的交互式Shel 
程序,方便用户进行交互式编程,用户可以
在该命令行下用Scala编写Spark程序。

5.4 
分布式ETL总体架构
1.
第2章介绍了基于Hadoop平台的ETL实现过程。在基于MapReduce模型实现的
ETL中,在转换阶段的多步数据处理过程中存在一些性能限制。而在Spark的多步数据
处理过程中,可以将中间数据缓存到内存中,从而多次重用已缓存数据,直到全部数据处
理完后保存到目标结果中。因而使用Spark进行ETL,能极大减少使用Hadoop平台进
行ETL过程中的磁盘I/O次数,提高整体过程的处理效率和处理性能。图5-3所示是基
于Spark的ETL过程中的数据流向图。


图5-3 基于Spark的ETL数据流向图

从图5-3中可以看出,原始数据经过分离(spilt)成若干部分,交予Spark进行处理, 
最终将处理后的数据整合(mere)成结果数据。相比于Hadoop平台,基于Spark的
ETL过程将主要的数据转换处理
g“封装”在Spark中,基于Spark的内存处理单元RDD 
完成复杂和耗时的转换处理,利用RDD的可缓存性,尽量减少转换过程中的I/O开销, 
从而提高ETL的整体性能。

基于Spark的分布式ETL架构如图5-4所示。首先从数据源抽取出需要的细粒度
原始数据,暂存到HDFS中,抽取的数据源可以是关系数据库、文本文件等,抽取实现则
可以根据数据源格式选择可用的Sqoop工具或者使用HDFS的上传功能;之后进入正式
的转换阶段,此阶段基于Spark集群,通过从HDFS中读取各数据块创建RDD来完成一
系列的转换处理,其中转换处理分为前期的格式化处理和之后的整合处理,包括数据清
洗、数据过滤、数据转换、数据聚合等转换操作;最后将转换完成的结果数据加载到目标数
据仓库中,加载实现可以直接通过JDBC中间件将数据写到目标(DW)中,或者先将数据
保存为指定格式(CSV 、Parquet)的文件,再进行后续加载。

本章主要介绍基于Spark的并行ETL的过程,重点关注数据转换过程中的并行实
现。从图5-4所示的架构中也可以看出,数据转换过程是ETL中相对复杂和耗时的阶
段,它需要对抽取到的源数据进行一系列连续的转换操作,以得到符合要求的目标
数据。

143 


144
图5-4 基于Spark的分布式ETL架构
5.1.5分布式转换引擎的实现
用户通过分布式转换引擎对数据源进行操作,它是基于Spark框架的分布式ETL工
具的中枢,其功能是从ETL脚本中读取任务流程相关节点信息,再对脚本解析后,根据相
关信息调用不同的流程节点执行。
分布式转换引擎包括流程解析模块、流程执行模块、Spark流程节点和用户扩展模
块。流程解析模块读取命令行传递的ETL脚本,并对ETL脚本进行解析,根据任务执行
序列和流程节点依赖关系调用相应的Spark流程节点或者用户扩展模块,动态编译加载
用户自定义的函数。用户扩展模块则读取用户自定义的Java源代码,将Java源代码进
行动态编译,或者对JavaClass文件进行加载运行。
采用解析脚本的方式实现分布式转换引擎,主要是从扩展性和灵活性两方面考虑,增加
分布式ETL工具的适应面。同时还在分布式转换引擎中添加用户扩展模块,通过使用向用
户提供的Java源代码进行动态编译加载的方式,给用户自定义转换节点功能的空间。
分布式转换引擎从ETL脚本中读取任务流程节点类型信息,根据流程节点类型的不
同,分为普通流程节点、Java源代码、外部类三种情况。当类型为普通流程节点时,直接
调用分布式ETL工具提供的相应Spark流程节点。当类型为Java源代码时,会根据源
代码创建JavaFileManagerImpl 对象,然后通过JavaSourceCompiler 类中的
JavaCompileTask对象调用JavaCompilerClassLoader对象,最终执行JavaCompiler的
getTask函数进行动态编译。当类型为class时,会根据ETL脚本参数中的类名执行
JavaCompilerClassLoader类中的class.forname()函数进行加载,但是必须将外部类文件
放置到指定目录,或者打成Jar包放置到classpath中。具体执行流程如图5-5所示。
1.流程解析模块
流程解析模块的功能为解析ETL脚本的内容,读取命令行传递的ETL脚本,并对
ETL脚本进行解析,生成任务执行序列和流程节点依赖关系。本书规定ETL脚本的格
式如表5-1所示。

145
图5-5 分布式转换引擎工作流程
表5-1ETL脚本格式
流程解析模块就是要对这种格式的ETL 进行解析,使基于Spark的分布式ETL 工
具能灵活地执行多种用途的ETL 任务。在5种类型的流程节点中,extract、transform 、
d分别代表ETL 流程中的抽取、转换、加载3种类型节点。值得注意的是,在ETL 流
t和end两种类型的节点,在整个ETL 流程中,start节点主要负责初始
根据脚本提供的参数创建SparkContext。end节点表示ETL 脚本结束, 
d节点后,分布式转换引擎才会将Spark任务提交到集群中执行。另外,在
ETL 流程中有且仅有一个start节点和一个end节点。
内容解析
流程节点序号当前流程节点的序号(开始节点序号为0,结束节点序号为1) 
前驱节点序号为其前驱节点的序号,若前驱节点为多个,则以逗号分隔
流程节点名称当前流程节点的名称
流程节点类型有5种类型:start、extract、transform 、load和end 
流程节点参数流程节点的参数设置
loa
程中增加了star
化Spark集群, 
只有添加en


2. 
Spark 
流程节点
常规的ETL 工具将流程节点分为抽取、转换、加载3种类型,但是Spark程序在执行
真正工作代码前,必须设置参数对集群进行初始化创建SparkContext。另外需要注意的
是,每个Spark程序有且只有一个SparkContext,不同SparkContext之间无法共享
RDD 。虽然可以将集群参数初始化放到抽取节点中进行,但是由于ETL 流程可能存在
多个抽取节点,所以在基于Spark框架的分布式ETL 工具增加了开始节点,负责对
Spark集群进行初始化创建SparkContext。同时在ETL 流程最后增加结束节点,负责将
Spark程序提交到Spark集群中运行。

对于常规的抽取、转换、加载节点,可以使用不同的Spark处理模型实现。例如,对于
常规批处理任务,可以使用Spark实现;对于实时性业务,可以使用SparkStreaming实
现;对于海量数据查询业务,可以用SparkSQL 实现等。需要注意的是,在Spark框架中, 
各个模块的Context类型并不相同,如普通Spark程序为SparkContext,SparkStreaming
程序为StreamingContext类型,SparkSQL 程序为SQLContext类型等,并不能随意转
换。不过由于SQLContext等类型都是基本SparkContext类型的子类,可以通过
SparkContext转换。所以,在实现中,常规流程节点会根据需要对开始节点中生成的
SparkContext进行转换,并在流程节点结束时再转换为SparkContext传给下一个流程
节点。

(1)基于Spark的抽取节点。
在Spark框架中,RDD 容错机制是基于Lineage的。Lineage是由不同RDD 的转换
关系构成的计算链,可以把这个计算链认为是RDD 之间演化的“血统”。在由于部分
Spark集群节点故障导致RDD 计算结果丢失时,只需要根据Lineage重新计算该RDD 
即可,整个Spark程序并不会失败。

这就会带来一个问题,如果数据源数量巨大,则全量抽取过程可能会持续时间较长, 
若中间有集群节点出现故障导致RDD 结果丢失,就需要重新执行整个抽取操作。所以, 
为了减少Spark集群中间RDD 计算结果丢失后重新抽取的开销,将每个RDD 的抽取数
量限制到一定大小,比如每个RDD 大小设置为20000 条数据,这样当某一个RDD 出错
后,重新计算的开销仅仅为抽取这个20000 条数据的开销,而不是重新抽取源数据库中
的所有数据,会大大减少重新抽取的开销,当数据量较大时将会带来性能的提升。

所以,基于Spark的全量抽取节点在对源数据库进行抽取时,首先获取表中的数据总
条数、主键值的最大值和最小值,之后根据单个RDD 限制数量的大小,计算出将要划分
的RDD 数,创建多个连接并行抽取源数据库中的数据。

(2)基于Spark的转换节点。
在抽取数据后,在将数据加载到目的数据库之前,经常需要处理某些数据。例如,常
见的数据迁移情况是数据源和数据目的表结构不一致,列名不相同,这时就需要转换原始
数据的列名;或者在迁移时只需要处理部分数据,这时可以直接将不需要处理的数据传递
为加载节点,将需要处理的数据进行转换处理后再加载,这个过程如图5-6所示。

经过抽取节点处理后,数据流分为两部分:发往转换节点的转换数据流和直接加载

146 


147
图5-6 有转换节点的数据迁移模型
转换数据流在转换节点处理后,生成加载数据流2发往加载节点。
因为在对数据进行转换处理时,抽取节点的元数据结构以及处理需求都不相同,很多
为了向用户提供灵活处理抽取数据的能力,从两方面向用户提供提交
va源代码的功能。
rk操作算子提供函数。
通过用户扩展模块动态编译加载执行码。
两种方式在形式上最主要的区别在ETL 脚本中参数的类型上,第①种方式不需要特
直接放置在流程节点的参数列表中,由流程节点直接运行;第②种方式则需
要标明为用户自定义功能Java源代码,由用户扩展模块对Java源代码进行动态编译加
k的加载节点。
加载节点是ETL 流程的结束,通过Spark的action算子实现,业务逻辑相对比较简
只需要将转换节点处理后的数据持久化到目的数据库即可。简单的做法是直接通过
h算子执行SQL 语句,将数据写入到目的数据库中,但这会使每个记录都向数据库
服务器申请创建一个连接对象,创建和销毁每个数据的连接对象将会造成不必要的时间
ark框架的RDD 是一个分散式内存数据集,RDD 中的数据存放在
所以不能直接以RDD 为单位向数据库服务器申请创建连接对象执行
这会导致某些集群节点由于未创建连接而不能执行SQL 操作。
因为RDD 在集群的不同节点都存在一个分区,所以需要使ForeachPartition算子为
每个分区创建一个单独的连接对象,使用该连接执行所有在该RDD 分区的SQL 语句,充
k集群多节点的并发性。
eaming的实时同步实现
ng实现同步,主要利用增量抽取机制进行,下面介绍增量抽取机制和
g的增量抽取机制。
1.增量抽取机制
数据抽取节点是ETL 流程的开始节点,直接影响数据转换节点和数据加载节点时的
如果数据抽取节点效率低,那么即便数据转换节点和数据加载节点使用分布式
也不能提高整个ETL 流程的处理效率。可以从多主机并行抽取和增量抽取两
的加载数据流1, 

都依赖于具体问题, 

针对具体问题的Ja

①通过向Spa
② 
殊说明类型, 

载执行。

(3)基于Spar
单, 
foreac

和资源开销。因为Sp
多个集群节点中, 
SQL 语句, 

分利用Spar

5.1.6 
SparkStr
SparkStreamiSparkStreamin

处理速度, 
处理框架, 


1 48 
方面提高数据抽取效率,增量抽取机制要比全量抽取更加复杂,要求在一定时间段内精确
迅速地抽取改变的数据,同时不能增加太大的负荷,影响源数据库服务器上业务的正常
运行。
2. SparkStreaming 的增量抽取机制
SparkStreaming是Spark核心API的一种扩展,通过它可以实现对实时流数据的高
吞吐量、低容错率处理。SparkStreaming可以很好地支持流数据格式,如很好地支持
Kafka、Flume、Kinesis、Twitter、ZeroMQ、MQTT等工具生成的流数据,这是当前多数公
司收集日志或者数据常用的方法,具有广泛的适应性。
SparkStreaming的内部实现原理是接收实时输入数据流并将数据流划分为微批次, 
然后由底层的Spark框架引擎分批处理,生成最终的结果流。SparkStreaming将原始流
数据抽象为DStream 的离散数据流。DStream 是SparkStreaming提供的基本抽象,代表
了一个连续的数据流,这个数据流可以是从数据源接收的,也可以是对输入流进行处理后
的数据流。因为在内部,它由多个RDD 连续序列表示,所以也是不可改变的数据集抽
象,DStream 中的每个RDD 都包含数据流上某个时间间隔的数据集。基于
SparkStreaming的程序运行周期如下。
(1)设置SparkConf,并生成JavaStreamingContext对象。
(2)通过创建输入DStream 定义输入数据源。
(3)通过对DStream 应用转换算子和输出操作定义数据流的计算流程。
(4)使用StreamingContext.start()函数开始接收流数据并根据第(2)步的计算流程
进行运算。
(5)使用streamingContext.awaitTermination()函数等待数据流运算结束。
在Spark 程序中有两种方法可以生成SparkStreaming 运行时需要的
JavaStreamingContext:直接使用SparkConf对象生成,以及从现有Spark 通用的
JavaSparkContext对象转换。第一种方法直接生成JavaStreamingContext,但是之后就
不能使用SparkSQL等其他库的函数(各自都有自己格式的Context);而第二种方法在
JavaStreamingContext关闭后,可以继续使用JavaSparkContext转换为其他格式的
Context,因此选择第二种方式生成StreamingContext。
5.2 Spark 完成在ETL 时的相关技术
本节主要讲述Spark的架构,分析Spark执行Application的实现逻辑,为后续扩展
为WebService提供理论支持。通过分析Spark的Master模块,可以深入理解Spark集
群的交互调用模式,从而设计出合适的SparkWebService。
为了后续描述更为准确,便于理解,介绍后续用到的部分Spark组件。
. Application:是指用户提交的Spark程序,运行时由Driver与Executor组成。
. Driver:用户运行Application时执行main()函数,创建SparkContext的过程。
. Executor:在Worker节点上由Application启动的,执行分配的Tasks的进程。