第
5
章大数据计算与分析5.1Hadoop & MapReduce
Hadoop MapReduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上TB级别的数据集。MapReduce是一种编程模型。Hadoop MapReduce采用主从(master/slave)结构。按照其编程规范,只需要编写少量的业务逻辑代码即可实现一个强大的海量数据并发处理程序。核心思想是分而治之。Mapper负责分,把一个复杂的任务分成若干个简单的任务分发到网络上的每个节点并行执行,最后把Map阶段的结果由Reduce进行汇总,输出到HDFS中,大大缩短了数据处理的时间开销。MapReduce就是以这样一种可靠且容错的方式对大规模集群海量数据进行数据处理、数据挖掘、机器学习等方面的操作。
MapReduce是Google最早提出用来进行创建和更新索引的一种分布式计算模式,该模式提供了一个简单的分布式计算框架,来降低分布式计算编程的难度,从而很好地解决了一般商用机及服务器面对海量数据计算响应过慢甚至不能完成作业的问题。海量数据对于单机而言,由于硬件资源限制,肯定是无法胜任的,而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度。引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由MapReduce框架来处理。
目前MapReduce主要用于海量数据的分布式计算,它主要起源于函数编程思想中的Map函数和Reduce函数操作。Map函数的操作原理是把一组数据映射为一个键值,Reduce函数的工作原理是对Map函数输出结果进行合并处理。
用户只需编写Map(映射)和Reduce(归约)两个函数,即可完成简单的分布式程序的设计。Map函数以键/值(key/value)对作为输入,产生另外一系列键/值对作为中间输出写入本地磁盘。MapReduce 框架会自动将这些中间数据按照key值进行聚集,且key值相同(用户可设定聚集策略,默认情况下是对key值进行哈希取模)的数据被统一交给Reduce函数处理。Reduce函数以key及对应的value列表作为输入,经合并key相同的value值后,产生另外一系列键/值对作为最终输出。
Map函数能够在一堆混杂的数据中按照开发者的意向提取需要的数据特征,交给Reduce函数来归纳输出最终的结果。一道真实的大数据面试题
你有1GB的文本数据,需要写一个Python程序统计这1GB数据中每个单词出现的次数,但是这个程序每个进程最多只允许占用256MB内存,请问你应如何统计?
〖3〗大数据分析技术与应用实践第5章大数据计算与分析〖3〗5.1.1用MapReduce解决一个问题
我们还是通过档案馆管理员的例子来描述MapReduce。图5.1是MapReduce的实际执行过程——Shuffle,并且把Shuffle的细节进行了形象的描述。
图5.1MapReduce实际执行过程
与HDFS不同,HDFS是解决数据如何存储的问题,对于如何统计没有描述。MapReduce要解决的是如何统计的问题,即具体是如何计算出“大连理工大学”出现多少次的。需要注意的是,两个例子描述的是两个不同层面的问题,请不要过度关联。待理解了整个过程以后,会对它们有新的认识。
新学期到了,档案馆管理员给甲、乙、丙3个同学安排了新的文件任务,这次的文档都是英文,任务是统计每个单词出现了多少次。
一共有4个英文文件需要处理。
(1) Split: 首先,管理员根据文件大小分配任务。甲处理文件1、 2,乙处理文件3,丙处理文件4。
(2) Partition: 按照单词首字母分类,甲、乙、丙分别先将所负责文件的单词用铅笔记录在一张纸上,如图5.2(a)所示。
(3) Sort and Spill: 每次这张纸写满后,把纸上记录的内容录入计算机。录入时,顺便将每部分的单词排序录入,如图5.2(b)所示。然后用橡皮将纸上的内容擦掉,重新记录。
(4) Merge: 当甲将自己负责的文件都输入计算机后,通过计算机再次将有相同首字母的部分合并到一起,如图5.3所示。乙、丙也做相同的操作。
图5.2第一阶段文件Split,Partition,
Sort和Spill操作
图5.3第一阶段Merge操作




甲、乙、丙都完成了第一阶段工作以后,管理员开始安排第二阶段任务。
(1) Fetch: 管理员让甲将第一阶段统计的a开头的单词都收集起来,如图5.4所示。由于甲、乙、丙各得出一个结果,所以收集了3组a开头单词的结果。
(2) Merge: 首先甲将第一阶段甲、乙的结果进行合并,然后再与第一阶段丙的结果合并,得到最终结果,如图5.5所示。
图5.4第二阶段Fetch操作
图5.5第二阶段Merge操作



(3) 与此同时,乙、丙在合并其他首字母的结果。甲合并完首字母为a的结果后,也去合并剩余首字母的结果。当a~z首字母都合并完,任务结束。
5.1.2MapReduce模型
MapReduce的核心思想是分而治之,把大的任务分成若干个小任务,并行执行小任务,最后把所有的结果汇总,因此整个作业的过程被分成两个阶段: Map阶段和Reduce阶段。Map阶段主要负责分,即把复杂的任务分解为若干个简单的任务来处理。这里简单的任务不但指数据或计算的规模相对原任务要大大缩小,同时这些小任务彼此间几乎没有依赖关系,可以并行计算。最后要注意就近计算原则,即任务应该分配到存放着所需数据的节点上进行计算。Reduce阶段负责对Map阶段的结果进行汇总。
例如,输入信息“Whether the weather be fine or whether the weather be not.Whether the weather be cold or whether the weather be hot.We will weather the weather whether we like it or not. ”在此信息上运行Map和Reduce操作输出的结果如图5.6所示。
图5.6Map和Reduce操作的数据处理过程实例
5.1.3Hadoop中的MapReduce
MapReduce作为一个分布式运算程序的编程框架,是Hadoop的四大组件之一,是用户开发基于Hadoop的数据分析应用的核心框架,其核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
图5.7为Hadoop中Map和Reduce操作的数据处理和传输过程。
图5.7Hadoop中Map和Reduce操作的数据处理和传输过程
(1) 首先,输入数据被切割成数据分片,每一个分片会复制多份到HDFS中。
(2) 在存储有输入数据分片的节点上运行Map任务。
(3) Map任务的输出结果在本地进行分区、排序。分区方法时通常将key值相同的数据放在同一个分区,Reduce阶段同一个分区的数据会被安排到同一个Reduce中。
(4) Shuffle过程把key值相同的value合并成列表(list)作为Reduce的输入。
(5) 每一个Reduce将所有Map对应分区的数据复制过来,进行合并和排序,将相同的key值对应的数据统一处理。在Reduce计算阶段,Reduce的输入键是key,而输入值是相同的key数据对应的value所构成的一个迭代器数据结构。
(6) Reduce处理后的输出结果可以存储到HDFS中。这些输出结果可以进一步作为另一个MapReduce任务的输入,进行下阶段的任务计算。
在Hadoop框架中,MapReduce以组件的模式工作,主要包括JobTracker和TaskTrackers两个组成部分。JobTracker是一个master服务,负责接收及分配作业,并调度作业对应的子任务运行在TaskTracker上,MapReduce组件框架图如图5.8所示。
图5.8MapReduce组件框架图
如图5.8所示,MapReduce的工作流程如下所述。
(1) 用户提交一个作业,该作业被发送到JobTracker服务器上, JobTracker是 MapReduce的核心,它通过心跳机制管理所有的作业。
(2) TaskTracker为MapReduce集群中的一个工作单元,主要完成JobTracker分配的任务。
(3) TaskTracker监控主机任务运行情况,通过心跳机制向JobTracker反馈自己的工作状态。
此过程中,使用者只需要在 MapReduce 模型上进行开发,并将数据以键/值形式表示,而不用考虑集群中的计算机之间的任务调度、容错处理、各节点之间的通信等细节问题。MapReduce 编程模型将借助 Hadoop 分布式文件系统,自动将数据计算分布到集群上调度作业运行。其中,涉及的类或进程功能如下所述。
(1) JobTracker: 一般应该部署在单独机器上的master服务,功能是接收Job,负责调度Job的每一个子任务Task运行在TaskTracker上,并且对它们进行监控,如果发现有失败的Task就重启。
(2) TaskTracker: 运行于多节点的slaver服务,功能是主动通过心跳与JobTracker进行通信接收作业,并且负责执行每一个任务。
(3) MapTask和ReduceTask: Mapper根据JobJar中定义的输入数据<key1, value1>读入,生成临时的<key2, value2>,如果定义了Combiner,MapTask会在Mapper完成后调用该Combiner将相同key的值做合并处理,目的是为了减少输出结果。MapTask全部完成后交给ReduceTask进程调用Reducer函数处理,生成最终结果<key3, value3>。
5.1.4Hadoop Streaming
Hadoop Streaming是Hadoop的一个工具, 它帮助用户创建和运行一类特殊的Map/Reduce作业, 这些特殊的Map/Reduce作业是由一些可执行文件或脚本文件充当Mapper或者Reducer。Mapper和Reducer都是可执行文件,它们从标准输入读入数据(一行一行读), 并把计算结果发给标准输出。Streaming工具会创建一个Map/Reduce作业, 并把它发送给合适的集群,同时监视这个作业的整个执行过程。
如果一个可执行文件被用于Mapper,则在Mapper初始化时, 每一个Mapper任务会把这个可执行文件作为一个单独的进程启动。Mapper任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,Mapper收集可执行文件进程标准输出的内容,并把收到的每一行内容转化成键/值对,作为Mapper的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。如果没有tab,整行作为key,value为null。不过,这可以定制。
如果一个可执行文件被用于Reducer,每个Reducer任务会把这个可执行文件作为一个单独的进程启动。Reducer任务运行时,它把输入切分成行并把每一行提供给可执行文件进程的标准输入。同时,Reducer收集可执行文件进程标准输出的内容,并把每一行内容转化成键/值对,作为Reducer的输出。默认情况下,一行中第一个tab之前的部分作为key,之后的(不包括tab)作为value。下面讨论如何自定义key和value的切分方式。
Hadoop Streaming除了支持流命令选项(streaming command options)外,还支持Hadoop的通用命令选项(generic command options)。命令的使用规则如下: > mapred streaming \[genericOptions\] \[streamingOptions\]需要注意的是,在提交Streaming作业中使用到通用命令选项的时候,需要把通用命令选项设置在流命令选项之前,否则将会出现一些错误。
目前的 Hadoop streaming(Hadoop 3.0.0)支持的流命令选项如表5.1所示。续表表5.1Hadoop Streaming支持的流命令选项参数是否可选描述input directoryname or filenamerequiredMapper的输入路径output directorynamerequiredReducer输出路径mapper executable or JavaClassNameoptionalMapper可执行程序或 Java 类名reducer executable or JavaClassNameoptionalReducer可执行程序或 Java 类名file filenameoptionalMapper、Reducer或Combiner 依赖的文件inputformat JavaClassNameoptional键/值对输入格式,默认为 TextInputFormatoutputformat JavaClassNameoptional键/值对输出格式,默认为 TextOutputformatpartitioner JavaClassNameoptional该类确定将键发送到哪个Reducecombiner streamingCommand or 
JavaClassNameoptionalMap输出结果执行 Combiner 的命令或者类名cmdenv name=valueoptional环境变量inputreaderoptional向后兼容,定义输入的 Reader 类,用于取代输出格式verboseoptional输出日志lazyOutputoptional延时输出numReduceTasksoptional定义Reduce数量mapdebugoptionalMap任务运行失败时,执行的脚本reducedebugoptionalReduce任务运行失败时,执行的脚本1. 提交作业的时候打包文件
如上所述,可以指定任意的可执行文件作为Mapper或者Reducer。在提交Hadoop Streaming作业的时候,Mapper或者Reducer程序不需要事先部署在Hadoop集群的任意一台机器上,仅需要在提交Streaming作业的时候指定 file 参数,这样Hadoop会自动将这些文件分发到集群。使用如下: 1. mapred streaming \\
2. -input myInputDirs \\
3. -output myOutputDir \\
4. -mapper myPythonScript.py \\
5. -reducer /usr/bin/wc \\
6. -file myPythonScript.py上面命令中file myPythonScript.py会导致Hadoop将这个文件自动分发到集群。
除了可以指定可执行文件之外,还可以打包Mapper或者Reducer程序会用到的文件(包括目录、配置文件等),例如:1. mapred streaming \\
2. -input myInputDirs \\
3. -output myOutputDir \\
4. -mapper myPythonScript.py \\
5. -reducer /usr/bin/wc \\
6. -file myPythonScript.py \\ 
7. -file myDictionary.txt 2. 为作业指定其他插件
与正常的 Map / Reduce 作业一样,还可以为流式作业指定其他插件,选项如下:   -inputformat JavaClassName
  -outputformat JavaClassName
  -partitioner JavaClassName
  -combiner streamingCommand or JavaClassName为inputformat指定的class文件必须返回Text类型的键/值对。如果没有指定InputFormat类,默认使用TextInputFormat类。TextInputFormat中key的返回类型是LongWritable,这个并不是输入数据的一部分,所以key部分将会被忽略,而仅返回value部分。
为outputformat指定的class文件接收的数据类型是Text类型的键/值对。如果没有指定OutputFormat 类,默认使用TextOutputFormat类。
可以在提交Streaming作业的时候设置环境变量,如下: -cmdenv EXAMPLE_DIR=/home/example/dictionaries/在提交流作业的时候,可支持的通用命令选项如表5.2所示。表5.2通用命令选项参数是 否 可 选描述conf configuration_fileoptional定义应用的配置文件D property=valueoptional定义参数fs host: port or localoptional定义NameNode地址filesoptional定义需要复制到Map/Reduce集群的文件,多个文件以逗号分隔libjarsoptional定义需要引入classpath的jar文件,多个文件以逗号分隔archivesoptional定义需要解压到计算节点的压缩文件,多个文件以逗号分隔3. 通过D选项指定配置变量
可以通过D <property>=<value>的方式指定额外的配置变量(configuration variables)。为了改变默认的本地临时目录,可以使用下面的命令: -D dfs.data.dir=/tmp增加额外的本地临时目录可以使用下面的命令: -D mapred.local.dir=/tmp/local
-D mapred.system.dir=/tmp/system
-D mapred.temp.dir=/tmp/temp4. 设置只有Map的作业
有时候仅想跑只有Map的Hadoop作业,只需要将 mapreduce.job.reduces 设置为0即可实现。这会导致MapReduce框架不会启动Reduce类型的Task。MapTask的输出就是作业的最终结果输出,设置如下: -D mapreduce.job.reduces=0为了向后兼容,Hadoop Streaming还支持reducer NONE选项,其含义等同于D mapreduce.job.reduces=0。
5. 设置Reduce的个数
下面例子中将程序的reduce个数设置为2: 1. mapred streaming \\
2. -D mapreduce.job.reduces=2 \\
3. -input myInputDirs \\
4. -output myOutputDir \\
5. -mapper /bin/cat \\
6. -reducer /usr/bin/wc 6. 自定义行数据如何拆分成键/值对
本文开头介绍过,当MapReduce框架从stdout读取行数据的时候,它会把一行数据拆分成一个键/值对。默认情况下,tab制表符分隔的前一部分数据作为key,后一部分数据作为value。当然,可以自定义行数据的分隔符: 1. mapred streaming \\
2. -D stream.map.output.field.separator=. \\
3. -D stream.num.map.output.key.fields=4 \\
4. -input myInputDirs \\
5. -output myOutputDir \\
6. -mapper /bin/cat \\
7. -reducer /bin/cat在上面例子中,stream.map.output.field.separator指定“.”为key和value的分隔符。
7. 使用大文件或归档文件
可以使用files 和 archives 选项分别指定文件或者归档文件(archives),这些文件可以被Tasks使用。使用这个选项时,需要把这些文件或者归档文件上传到HDFS。这些文件在作业执行的时候会被缓存到所有的Jobs中。
files选项会在当前Tasks的工作目录(current working directory)下创建一个符号链接(symlink),这个链接指定的就是从HDFS复制文件的副本。下面例子中,指定了HDFS上的testfile.txt文件,在使用files选项之后,其会在Tasks的当前工作目录下创建名为testfile.txt的符号链接。-files hdfs://host:fs_port/user/testfile.txt当然,也可以自己通过#设置符号链接的名字: -files hdfs://host:fs_port/user/testfile.txt#testfile如果需要指定多个文件,使用如下: