纯干活:通过WourdCount程序示例:详细讲解MapReduce之BlockSplitShuffleMapReduce的区别及数据处理流程。 Shuffle过程是MapReduce的核心,集中了MR过程最关键的部分。要想了解MR,Shuffle是必须要理解的。了解Shuffle的过程,更有利于我们在对MapReduce job
纯干活:通过WourdCount程序示例:详细讲解MapReduce之Block+Split+Shuffle+Map+Reduce的区别及数据处理流程。
Shuffle过程是MapReduce的核心,集中了MR过程最关键的部分。要想了解MR,Shuffle是必须要理解的。了解Shuffle的过程,更有利于我们在对MapReduce job性能调优的工作有帮助,以及进一步加深我们队MR内部机理的了解。Shuffle到底是什么,自己在参考一位大牛两年前的博客,关于MR系列的文章中,才知道前辈什么时候已经开始相应的工作,真实佩服。这里通过对前辈的概念梳理,加上自己的见解,来尽可能的梳理清楚什么是Shuffle过程,什么是block,什么是split,揭开MR的神秘面纱。
在上篇博客中简单给出了Shuffle的概念,稍提了一下split,但没有谈block。在了解Shuffle之间我们要先了解一下block与split。这里的一片数据,你可以理解成一个split数据。但split和block的区别是什么?
你把文件上传到HDFS中,第一步就是数据的划分,这个是真实物理上的划分,数据文件上传到HDFS后,要把文件划分成一块一块,每块的大小可以有hadoop-default.xml里配置选项进行划分。这里默认每块64MB,一个文件被分成多个64MB大小的小文件,最后一个可能于64MB。注意:64MB只是默认,是可以更改的,下面会谈到如何更改。
<property> <name>dfs.block.size</name> <value>67108864</value> <description>The default block size for new files.</description> </property>
数据的划分有冗余,冗余的概念来自哪儿?为了保证数据的安全,上传的文件是被复制成3份,当一份数据宕掉,其余的可以即刻补上。当然这只是默认。
<property> <name>dfs.replication</name> <value>3</value> <description> Default block replication.The actual number of replications can be specified when the file is created.The default is used if replication is not specified in create time. </description> </property>
Hadoop中,有另一种关于数据的划分。这里定义了一个InputFormat接口,其中一个方法就是getSplits方法。这里就谈到了split。由于Hadoop版本更新换代很快,不同版本中的split的划分是由不同的job任务来完成的。早早先的版本split是有JobTracker端弯沉的,后来的版本是由JobClient完成的,JobClient划分好后,把split.file写入HDFS中,到时候JobTracker端读这个文件,就知道split是怎样划分的了。这种数据的划分其实只是一种逻辑上的划分,目的是为了让Map Task更好的获取数据。
例如:
File1:Block11,Block12,Block13,Block14,Block15 File2:Block21,Block22,Block23
如果用户在程序中指定map tasks的个数,比如说是2,如果不指定maptasks个数默认是1,那么在FileInputFormat(最常见的InputFormat实现)的getSplits方法中,首先会计算totalSize=8(源码中定义,注意getSplits这个函数计算的单位是Block个数,而不是Byte个数,后面有个变量叫bytesremaining表示剩余的 Block个数,不要根据变量名就认为是度byte的字数),然后会计算goalSize=totalSize/numSplits=4,对于File1,计算一个Split 有多少个Block就是这样计算的。
long splitSize = computeSplitSize(goalSize, minSize, blockSize); protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }
这里minSize是1(说明了1个Split至少包含1个Block,不会出现一个Split包含零点几个Block的情况),计算得出splitSize=4,所以接下来Split划分是这样分的:
Split 1: Block11, Block12, Block13,Block14 Split 2: Block15 Split 3: Block21, Block22, Block23
JobInProgress: initTasks() try { splits = JobClient.readSplitFile(splitFile); } finally { splitFile.close(); } numMapTasks = splits.length; maps = new TaskInProgress[numMapTasks];
几个简单的结论:
1)一个split大于等于1的整数个Block
2) 一个split不会包含两个File的Block,不会跨越File边界
3) split和Block的关系是一对多的关系,默认一对一
4) maptasks的个数最终决定于splits的长度
还有一点要说明:在FileSplit类中,有一项是private String[] hosts;看上去是说明这个FileSplit是放在哪些机器上的,实际上hosts里只是存储了一个Block的冗余机器列表。比如上面例子中的Split 1: Block11, Block12, Block13,Block14,这个FileSplit中的hosts里最终存储的是Block11本身和其冗余所在的机器列表,也就是说 Block12,Block13,Block14这些块存在的那些机器上没有在FileSplit中记录,并包含Block与Split之间的关系记录。
FileSplit中的这个属性有利于调度作业时候的数据本地性问题(数据本地性:作用很大,更有利于MapReduce性能调优)。如果一个tasktracker前来索取task,jobtracker就会找个task给它,找到一个maptask,得先看这个task的输入的FileSplit里hosts是否包含tasktracker所在机器,也就是判断和该tasktracker同时存在一个机器上的datanode是否拥有FileSplit中某个Block的备份。
简单的说了一下Block与Split之间的联系与区别。其实问题不在这里,网上又有新的解释如:MapReduce中如何处理跨行Block和UnputSplit的问题。其实大致就是想说命Block与Split之间的区别,这里不再说明。
回到今天的重点:Shuffle
Shuffle,也称Copy阶段。Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阀值,则写到磁盘上,否则直接放到内存中。其实就是打乱,如在java API Collections.shuffle(List);方法里的随机的打乱参数list里的元素顺序。
在官方给的关于Shuffle过程的描述,很模糊,建议大家不要看了。还是看看大牛们是怎么总结Shuffle过程的吧。关于Shuffle,我们需要清楚Shuffle是怎样把map task的输出结果有效地传送到reduce端的。其实Shuffle描述的就是数据从map task 的输出到reduce task输入的这段过程。
在Hadoop这样的集群环境中,大部分map task与reduce task的执行是在不同的节点上。当然很多情况下Reduce执行时需要跨节点去拉取其它节点上的map task结果。如果集群正在运行的job有很多,那么task的正常执行对集群内部的网络资源消耗会很严重。这种网络消耗是正常的,我们不能限制,能做的 是最大化地减少不必要的消耗。还有在节点内,相比于内存,磁盘IO对job完成时间的影响也是可观的。从最基本的要求来说,我们对Shuffle过程的 期望可以有:
1)完整地从map task端拉取数据到Reduce端。
2)在跨节点拉取数据时,尽可能地减少对带宽的不必要消耗。
3)减少磁盘IO对task执行的影响。
到这里时,大家可以去想想,如果是自己来设计这段Shuffle过程,那么你的设计目标是什么。能优化的地方主要在于减少拉取数据的量及尽量使用内存而不是磁盘。
这里根据博主指定的在Hadoop0.21.0的源码中的Shuffle过程。以WordCount为例,并假设它有8个map task和3个reduce
task。而Shuffle过程横跨map与reduce两端。
上图是某个map task的运行情况。图中清楚的指出partition、sort与combiner到底作用在哪个阶段。从这个图,可以清晰的了解map数据输出到map端所有数据准被好的全过程。
整个流程分为四步。简单可这样说,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task 结束后在对磁盘中这个map task 产生的所有临时文件合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
其实每一步都包含着多个步骤与细节:
1、在map task执行时,它的输入数据来源于HDFS的block,当然在MapReduce概念中,map task只读取split。Split与block的对应关系在上面我们已经说的很明白了。在WordCount例子里,假设map的输入数据都是像 “aaa”这样的字符串。
2、在经过mapper的运行后,我们得知mapper的输出是这样一个key/value对: key是“aaa”, value是数值1。因为当前map端只做加1的操作,在reduce task里才去合并结果集。前面我们知道这个job有3个reduce task,到底当前的“aaa”应该交由哪个reduce去做呢,是需要现在决定的。
MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个 reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。
在上面的例子中,“aaa”经过Partitioner后返回0,也就是这对值应当交由第一个reducer来处理。接下来,需要将数据写入内存缓冲区中,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之 前,key与value值都会被序列化成字节数组。 而整个内存缓冲区就是一个字节数组。
3、这个内存缓冲区是有大小限制的,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map 的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
在这里我们可以想想,因为map task的输出是需要发送到不同的reduce端去,而内存缓冲区没有对将发送到相同reduce端的数据做合并,那么这种合并应该是体现是磁盘文件中的。从官方图上也可以看到写到磁盘中的溢写文件是对不同的reduce端的数值做过合并。所以溢写过程一个很重要的细节在于,如果有很多个 key/value对需要发送到某个reduce端去,那么需要将这些key/value值拼接到一块,减少与partition相关的索引记录。
在针对每个reduce端而合并数据时,有些数据可能像这样:“aaa”/1, “aaa”/1。对于Wordcount例子,就是简单地统计单词出现的次数,如果在同一个map task的结果中有很多个像“aaa”一样出现多次的key,我们就应该把它们的值合并到一块,这个过程叫reduce也叫combine。但 MapReduce的术语中,reduce只指reduce端执行从多个map task取数据做计算的过程。除reduce外,非正式地合并数据只能算做combine了。其实大家知道的,MapReduce中将Combiner等 同于Reducer。
如果client设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种 Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。
4、每次溢写会在磁盘上生成一个溢写文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也全部溢写到磁盘中形成一个溢写文件。最终磁盘中会至少有一个这样的溢写文件存在(如果map的输出结果很少,当 map执行完成时,只会产生一个溢写文件),因为最终的文件只有一个,所以需要将这些溢写文件归并到一起,这个过程就叫做Merge。Merge是怎样的?如前面的例子,“aaa”从某个map task读取过来时值是5,从另外一个map 读取时值是8,因为它们有相同的key,所以得merge成group。什么是group。对于“aaa”就是像这样的:{“aaa”, [5, 8, 2, …]},数组中的值就是从不同溢写文件中读取出来的,然后再把这些值加起来。请注意,因为merge是将多个溢写文件合并到一个文件,所以可能也有相同的 key存在,在这个过程中如果client设置过Combiner,也会使用Combiner来合并相同的key。
至此,map端的所有工作都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每个reduce task不断地通过RPC从JobTracker那里获取map task是否完成的信息,如果reduce task得到通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。
简单地说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge,也最终形成一个文件作为reduce task的输入文件。
如map 端的细节图,Shuffle在reduce端的过程也能用图上标明的三点来概括。当前reduce copy数据的前提是它要从JobTracker获得有哪些map task已执行结束,这段过程不详述。Reducer在真正运行之前,所有的时间都是在拉取数据,做merge,且不断重复地在做。如前面的方式一样,下面我也分段地描述reduce 端的Shuffle细节:
1、Copy过程,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。因为map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。
2、Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。这里需要强调的是,merge有三种形 式:1)内存到内存 2)内存到磁盘 3)磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。
3、Reducer的输入文件。不断地merge后,最后会生成一个“最终文件”。为什么加引号?因为这个文件可能存在于磁盘上,也可能存在于内存中。对我们 来说,当然希望它存放于内存中,直接作为Reducer的输入,但默认情况下,这个文件是存放于磁盘中的。至于怎样才能让这个文件出现在内存中,后面再说。当Reducer的输入文件已定,整个Shuffle才最终结束。然后就是Reducer执行,把结果放到HDFS上。
上面就是整个MapReduce中Shuffle的过程,从文件上传到HDFS中开始,文件分块Block,Split读块,到Map,到Reduce的过程,我们都梳理了一遍。上面的内容,主要参考大牛的博客,自己手打一遍,包括map/reduce端流程自己再画一遍。一、是为了自己明白整个流程内部机理,加深理解。二、是表示对两年前博主就有如此深度的见解尊重。希望此篇博客,能对后学习者有个指导作用,加深对MapReduce内部机理流程的了解。
感谢博主:http://langyu.iteye.com/blog/992916
Copyright?BUAA