第5章MapReduce分布式计算 5.1MapReduce简介 Hadoop MapReduce是一个快速、高效、简单用于编写并行处理大数据程序及应用在大集群上的编程框架。其前身是Google公司的MapReduce,它是Google公司的核心计算模型,将复杂的、运行于大规划集群上的并行计算过程高度地抽象到了两个函数: Map和Reduce。它适合用MapReduce来处理的数据集(或任务),需要满足一个基本要求: 待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。概念Map(映射)和Reduce(归纳)及它们的主要思想,都是从函数式编程语言中借来的,同时包含了从矢量编程语言中借来的特性。Hadoop MapReduce极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 5.1.1MapReduce架构 和HDFS一样,MapReduce也是采用Master/Slave的架构,其架构如图51所示。 图51MapReduce架构图 它主要由Client、JobTracker、TaskTracker及Task 4个部分组成。 (1) Client会在用户端通过Client类将应用配置参数打包成jar文件存储到hdfs,并把路径提交到Jobtracker,然后由JobTracker创建每一个Task(即MapTask和ReduceTask),并将它们分发到各个TaskTracker服务中去执行。 (2) JobTracker负责资源监控和作业调度。JobTracker 监控所有TaskTracker 与job的健康状况,一旦发现失败,就将相应的任务转移到其他节点。同时,JobTracker 会跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器,而调度器会在资源出现空闲时,选择合适的任务使用这些资源。在Hadoop 中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的调度器。 (3) TaskTracker 会周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、结束任务等)。TaskTracker 使用slot等量划分本节点上的资源量。slot代表计算资源(CPU、内存等)。一个Task 获取一个slot 后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot 分配给Task 使用。slot 分为Map slot 和Reduce slot 两种,分别供MapTask 和ReduceTask 使用。TaskTracker 通过slot 数目(可配置参数)限定Task 的并发度。 (4) TaskTracker分为MapTask 和ReduceTask 两种,均由TaskTracker 启动。HDFS以固定大小的block 为基本单位存储数据,而对于MapReduce 而言,其处理单位是split。split 是一个逻辑概念,只包含一些元数据信息,如数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自行决定。但需要注意的是,split 的多少决定了MapTask 的数目,因为每个split 只会交给一个MapTask 处理。split 和 block的关系如图52所示。 图52split 和 block的关系 MapTask的执行过程如图53所示。由图53可知,MapTask 先将对应的split 迭代解析成一个个key/value 对,依次调用用户自定义的map() 函数进行处理,最终将临时结果存放到本地磁盘上,其中临时数据被分成若干个partition,每个partition 将被一个ReduceTask处理。 图53MapTask的执行过程 ReduceTask的执行过程如图54所示。该过程分为以下3个阶段。 (1) 从远程节点上读取MapTask 中间结果(称为“shuffle 阶段”)。 (2) 按照key/value 对进行排序(称为“sort 阶段”)。 (3) 依次读取<key, value list>,调用用户自定义的reduce() 函数处理,并将最终结果保存到HDFS 上(称为“reduce 阶段”)。 图54ReduceTask的执行过程 5.1.2MapReduce的原理 MapReduce采用的是“分而治之”的策略,当我们处理大规模的数据时,将这些数据拆解成多个部分,并利用集群的多个节点同时进行数据处理,然后对各个节点得到的中间结果进行汇总,经过进一步的计算,得到最终结果。 一个MapReduce作业(job)通常会把输入的数据集切分为若干个独立的数据块,由map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序,然后把结果输入给reduce任务。通常,作业的输入和输出都会被存储在文件系统中,整个框架负责任务的调度和监控,以及重新执行已经失败的任务。 通常,MapReduce框架的计算节点和存储节点是运行在一组相同的节点上,也就是说,运行MapReduce框架和运行HDFS文件系统的节点通常是在一起的。这种配置允许框架在那些已经存好的数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。 MapReduce框架由一个主节点(ResourceManager)、多个子节点(运行NodeManager)和MRAppMaster(每个任务一个)共同组成。应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数,再加上其他作业的参数,就构成了作业配置(job configuration)。Hadoop的job client 提交作业(jar包/可执行程序等)和配置信息给ResourceManager,后者负责分发这些软件和配置信息给slave、调度任务且监控它们的执行,同时提供状态和诊断信息给jobclient。 虽然Hadoop框架是用Java实现的,但MapReduce应用程序不一定要用Java来写,也可以使用Ruby、Python、C++等来编写。 MapReduce框架的流程如图55所示。 图55MapReduce 框架的流程图 针对上面的流程可以分为两个阶段来描述。 1. Map阶段 (1) InputFormat根据输入文件产生键值对,并传送到Mapper类的map函数中。 (2) Map输出键值对到一个没有排序的缓冲内存中。 (3) 当缓冲内存达到给定值或map任务完成,在缓冲内存中的键值对就会被排序,然后输出到磁盘中的溢出文件中。 (4) 如果有多个溢出文件,那么就会整合这些文件到一个文件中,且是排序的。 (5) 这些排序过的、在溢出文件中的键值对会等待Reducer类的获取。 2. Reduce阶段 (1) Reducer类获取Mapper类的记录,然后产生另外的键值对,最后输出到HDFS中。 (2) Reducer类中的reduce方法针对每个key调用一次。 (3) Shuffle: 相同的key被传送到同一个的Reducer 类中。 (4) 当有一个Mapper类完成后,Reducer类就开始获取相关数据,所有的溢出文件会被排到一个内存缓冲区中。 (5) 当Reducer所有相关的数据都传输完成后,所有溢出文件就会被整合和排序。 (6) 当内存缓冲区满了后,就会在本地磁盘产生溢出文件。 (7) Reducer类输出到HDFS。 5.1.3MapReduce的工作机制 1. MapReduce运行图 MapReduce运行图如图56所示。 图56MapReduce运行图 2. 运行解析 1) 作业的提交 (1) 此方法调用submit()。在submit()方法里面连接JobTracker,即生成一个内部JobSummitter(实际上是new JobClient()),在new JobClient()里面生成一个JobSubmissionProtocol接口(JobTracker实现了此接口)对象jobSubmitClient(是它连接或对应着JobTracker),在submit()方法里面也调用JobClient.submitJobInternal(conf)方法返回一个RunningJob(步骤1)。 (2) 参数true说明要调用方法jobClient.monitorAndPrintJob()即检查作业的运行情况(每秒一次),如果有变化就报告给控制台。 jobClient.submitJobInternal()所实现的提交作业过程如下。 ① 向JobTracker请求一个新的job ID(步骤2)。 ② 检查作业的输出路径,如果未指定或已存在,则不提交作业并抛错误给程序。 ③ 计算并生成作业的输入分片,如果路径不存在,则不提交作业并抛错误给程序。 ④ 将运行作业所需要的资源(包括作业jar文件、配置文件和计算所得的输入分片)复制到JobTracker的文件系统中以job ID命名的目录下(即HDFS中)。作业jar副本较多(mapred.submit.replication = 10)(步骤3)。 ⑤ 告知JobTracker作业准备执行(真正地提交作业jobSubmitClient.submitJob())(步骤4)。 2) 作业的初始化 (1) JobTracker接收到对其submitJob()方法的调用后,将其放入内部队列,交由job scheduler进行调度,并对其进行初始化,包括创建一个正在运行作业的对象——封装任务和记录信息(步骤5)。 (2) 为了创建任务运行列表,Job Scheduler首先从共享文件系统中获取已计算好的输入分片信息(步骤6),然后为每个分片创建一个map任务。 (3) 创建的reduce任务数量由Job的mapred.reduce.task属性决定(setNumReduceTasks()设置),schedule创建相应数量的reduce任务。任务在此时被指定ID。 (4) 除了map和reduce任务,还有setupJob和cleanupJob需要建立: 由TaskTrackers在所有map开始前和所有reduce结束后分别执行,这两个方法在OutputCommitter中(默认是FileOutputCommitter)。setupJob()创建输出目录和任务的临时工作目录; cleanupJob()删除临时工作目录。 3) 作业的分配 (1) 每个TaskTracker定期发送心跳给JobTracker,告知自己还活着,并附带消息说明自己是否已准备好接受新任务。JobTracker以此来分配任务,并使用心跳的返回值与TaskTracker通信(步骤7)。JobTracker利用调度算法先选择一个job,然后再选此job的一个task分配给TaskTracker。 (2) 每个TaskTracker会有固定数量的map和reduce任务槽,数量由TaskTracker核的数量和内存大小来决定。JobTracker会先将TaskTracker的所有的map槽填满,然后才填此TaskTracker的reduce任务槽。 (3) JobTracker分配map任务时会选取与输入分片最近的TaskTracker,分配reduce任务用不着考虑数据本地化。 4) 任务的执行 (1) TaskTracker分配到一个任务后,首先从HDFS中把作业的jar文件及运行所需要的全部文件(DistributedCache设置的)复制到TaskTracker本地(步骤8)。 (2) TaskTracker为任务新建一个本地工作目录,并把jar文件的内容解压到这个文件夹下。 (3) TaskTracker新建一个TaskRunner实例来运行该任务(步骤9)。 (4) TaskRunner启动一个新的JVM来运行每个任务(步骤10),以便客户的map/reduce不会影响TaskTracker。 5) 进度和状态的更新 一个作业和它的每个任务都有一个状态,包括作业或任务的运行状态(running、successful、failed)、map和reduce的进度、计数器值、状态消息或描述。 map进度标准是处理输入所占比例; reduce是copy\merge\reduce整个进度的比例。 Child JVM有独立的线程,每隔3秒检查任务更新标志,如果有更新,就会报告给此TaskTracker。 TaskTracker每隔5秒给JobTracker发心跳。 JobTracker合并这些更新,产生一个表明所有运行作业及其任务状态的全局试图。 JobClient.monitorAndPrintJob()每秒查询这些信息。 6) 作业的完成 当JobTracker收到最后一个任务(this will be the special job cleanup task)的完成报告后,便把job状态设置为successful。 job得到完成信息便从waitForCompletion()返回。 最后,JobTracker清空作业的工作状态,并指示TaskTracker也清空作业的工作状态(如删除中间输出)。 3. 失败解析 1) 任务失败 (1) 子任务失败。当map或reduce子任务中的代码抛出异常,JVM进程会在退出之前向父进程TaskTracker发送错误报告,TaskTracker会将此(任务尝试)task attempt标记为failed状态,释放一个槽以便运行另外一个任务。 (2) JVM失败。JVM突然退出,即JVM错误,这时TaskTracker会注意到进程已经退出,标记为failed。 ① 任务失败有重试机制,重试次数map任务设置是由mapred.map.max.attempts属性控制的,reduce是由mapred.reduce.max.attempts属性控制的。 ② 一些job可以完成总体任务的一部分就能够接受,这个百分比由mapred.map.failures.precent和mapred.reduce.failures.precent参数控制。 ③ 任务尝试(task attempt)是可以中止(killed)的。 2) TaskTracker失败 作业运行期间,TaskTracker会通过心跳机制不断与系统JobTracker通信,如果某个TaskTracker运行缓慢或失败及出现故障,TaskTracker就会停止或很少向JobTracker发送心跳,JobTracker会注意到此TaskTracker发送心跳的情况,从而将此TaskTracker从等待任务调度的TaskTracker池中移除。 (1) 如果是map并且成功完成, JobTracker会安排此TaskTracker上成功运行的map任务返回。 (2) 如果是reduce并且成功完成,则数据直接使用,因为reduce只要执行完就会把输出写到HDFS上。 (3) 如果它们属于未完成的作业,那么reduce阶段无法获取该TaskTracker上的本地map输出文件,任何任务都需要重新调度。 另外,即使TaskTracker没有失败,如果它上面的失败任务远远高于集群的平均失败任务数,也会被列入黑名单。可以通过重启从JobTracker的黑名单中移除。 3) JobTracker失败 JobTracker失败应该说是最严重的一种失败方式了,而且在Hadoop中存在单点故障的情况下是相当严重的,因为在这种情况下作业最终将失败,尽管这种故障的概率极小。未来版本可以通过启动多个JobTracker,在这种情况只运行一个主的JobTracker,通过一种机制来确定哪个是主的JobTracker。 5.2MapReduce操作实践 视频讲解 5.2.1MapReduce WordCount编程实例 词频统计是最简单也是最能体现MapReduce思想的程序之一,可以称为MapReduce版“Hello World”,该程序的完整代码可以在Hadoop安装包的src/examples目录下找到。词频统计主要完成的功能是: 统计一系列文本文件中每个单词出现的次数。本节通过分析源代码帮助读者厘清MapReduce程序的基本结构。 1. WordCount代码分析 MapReduce框架自带的示例程序WordCount只包含Mapper类和Reduce类,其他全部使用默认类。下面为WordCount源代码分析。 1) Mapper类 Map过程需要继承org.apache.hadoop.mapreduce包中的Mapper类,并重写map方法。 public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //map方法,划分一行文本,读一单词写出的一<单词,1> public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); //写出<单词,1> } } } 2) Reduce类 public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); //相当于<Hello,1><Hello,1>将两个1相加 } result.set(sum); context.write(key, result); } } 3) 主函数 public static void main(String[] args) throws Exception {//主方法,函数入口 Configuration conf = new Configuration();//实例化配置文件类 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count");//实例化Job类 job.setJarByClass(WordCount.class);//设置主类名 job.setMapperClass(TokenizerMapper.class);//指定使用上述自定义Map类 job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //设置输出结果文件位置 System.exit(job.waitForCompletion(true) ? 0 : 1); //提交任务并监控任务状态 } 4) 提交WordCount public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); //map方法,划分一行文本,读一单词写出的一<单词,1> public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); //写出<单词,1> } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); //相当于<Hello,1><Hello,1>将两个1相加 } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception {//主方法,函数入口 Configuration conf = new Configuration();//实例化配置文件类 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count");//实例化Job类 job.setJarByClass(WordCount.class);//设置主类名 job.setMapperClass(TokenizerMapper.class);//指定使用上述自定义Map类 job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); //设置输出结果文件位置 System.exit(job.waitForCompletion(true) ? 0 : 1); //提交任务并监控任务状态 } } 2. 打包运行 (1) 在Eclipse中选择 File→Export选项,导出jar包。 (2) 新建两个文本文件并上传到HDFS: [root@master ~]# hdfs dfs -mkdir /input [root@master ~]# hdfs dfs -rm /input/* [root@master ~]# hdfs dfs -put sw*.txt /input (3) 运行jar包: [root@master ~]# hadoop jar WordCountTest.jar WordCountTest /input /output (4) 查看运行结果: [root@master ~]# hdfs dfs -ls /output [root@master ~]# hdfs dfs -text /output/part-r-00000 5.2.2MapReduce倒排索引编程实例 1. 简介 倒排索引是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。它主要用来存储某个单词(或词组)在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行了相反的操作,因而称为倒排索引(inverted index)。通常情况下,倒排索引由一个单词(或词组)及相关的文档列表组成,文档列表中的文档或者标识文档的ID号,或者指定文档所在位置的URI。 2. 分析与设计 本节实现的倒排索引主要关注的信息为单词、文档URI及词频。下面根据MapReduce的处理过程给出倒排索引的设计思路。 1) Map过程 首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然,Map过程首先必须分析输入的<key,value>对,得到倒排索引中需要的3个信息: 单词、文档URI和词频。这里存在两个问题: 第一,<key,value>对只能有两个值,作为key或value值; 第二,通过一个Reduce过程无法同时完成词频统计和生成文档列表,所以必须增加一个Combine过程完成词频统计。 这里将单词和URI组成key值,将词频作为value,这样做的好处是可以利用MapReduce框架自带的Map端排序,将同一个文档的相同单词的词频组成列表,传递给Combine过程,实现类似于WordCount的功能。 2) Combine过程 经过map方法处理后,Combine过程将key值相同的value值累加,得到一个单词在文档中的词频。如果单词在文档中词频的输出作为Reduce过程的输入,在Shuffle过程时将面临一个问题: 所有具有相同单词的记录(由单词、URI和词频组成)应该交由同一个Reduce处理,但当前的key值无法保证这一点,所以必须修改key值和value值。这次将单词作为key值,URI和词频组成value值。这样做的好处是可以利用MapReduce框架默认的HashPartitioner类完成Shuffle过程,将所有相同的单词发送给同一个Reduce处理。 3) Reduce过程 经过上述两个过程后,Reduce过程只需要将key值相同的value值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给MapReduce框架进行处理了。 4) 需要解决的问题 本节设计的倒排索引在文件数目上没有限制,但是单个文件不宜过大(具体值与默认HDFS块大小及相关配置有关),要保证每个文件对应一个split。否则,由于Reduce过程没有进一步统计词频,最终结果可能会出现词频未统计完全的单词。因此可以通过重写Inputformat类将每一个文件作为一个split,避免上述情况; 或者执行两次MapReduce,第一次用于统计词频,第二次用于生成倒排索引。除此之外,还可以利用复合键值对等实现包含更多信息的倒排索引。 3. 倒排索引完整源代码 import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class InvertedIndex { public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text> { private Text keyInfo = new Text(); private Text valueInfo = new Text(); private FileSplit split; public void map(Object key, Text value, Context context) throws IOException, InterruptedException { split = (FileSplit)context.getInputSplit(); StringTokenizer itr = new StringTokenizer(value.toString()); while(itr.hasMoreTokens()) { keyInfo.set(itr.nextToken() + ":" + split.getPath().toString()); valueInfo.set("1"); context.write(keyInfo, valueInfo); } } } public static class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text> { private Text info = new Text(); public void reduce(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException { int sum = 0; for(Text value : values) { sum += Integer.parseInt(value.toString()); } int splitIndex= key.toString().indexOf(":"); info.set(key.toString().substring(splitIndex + 1) + ":" + sum); key.set(key.toString().substring(0, splitIndex)); context.write(key, info); } } public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> { private Text result = new Text(); public void reducer(Text key, Iterable<Text>values, Context context) throws IOException, InterruptedException { String fileList = new String(); for(Text value : values) { fileList += value.toString() + ";"; } result.set(fileList); context.write(key, result); } } public static void main(String[] args) throws Exception{ //TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "InvertedIndex"); job.setJarByClass(InvertedIndex.class); job.setMapperClass(InvertedIndexMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setCombinerClass(InvertedIndexCombiner.class); job.setReducerClass(InvertedIndexReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } } 程序运行和WordCount同样原理。 小结 本章首先阐述了MapReduce架构,然后介绍了MapReduce的工作原理和MapReduce的工作机制,最后重点介绍了基于MapReduce架构的 WordCount编程实例和倒排索引编程实例。 习题 1. 简述MapReduce架构。 2. 简述MapReduce的工作原理。 3. 简述MapReduce的工作机制。 4. 编写MapReduce WordCount。 5. 实现MapReduce倒排索引编程。