來(lái)源:codingwu 鏈接:http://www.cnblogs.com/archimedes/p/mapreduce-example-analysis.html
在文章《MapReduce原理與設(shè)計(jì)思想》中,,詳細(xì)剖析了MapReduce的原理,這篇文章則通過(guò)實(shí)例重點(diǎn)剖析MapReduce
1.MapReduce概述
Hadoop Map/Reduce是一個(gè)使用簡(jiǎn)易的軟件框架,,基于它寫(xiě)出來(lái)的應(yīng)用程序能夠運(yùn)行在由上千個(gè)商用機(jī)器組成的大型集群上,并以一種可靠容錯(cuò)的方式并行處理上T級(jí)別的數(shù)據(jù)集,。
一個(gè)Map/Reduce 作業(yè)(job) 通常會(huì)把輸入的數(shù)據(jù)集切分為若干獨(dú)立的數(shù)據(jù)塊,由 map任務(wù)(task)以完全并行的方式處理它們,??蚣軙?huì)對(duì)map的輸出先進(jìn)行排序,, 然后把結(jié)果輸入給reduce任務(wù)。通常作業(yè)的輸入和輸出都會(huì)被存儲(chǔ)在文件系統(tǒng)中,。 整個(gè)框架負(fù)責(zé)任務(wù)的調(diào)度和監(jiān)控,,以及重新執(zhí)行已經(jīng)失敗的任務(wù),。
通常,Map/Reduce框架和分布式文件系統(tǒng)是運(yùn)行在一組相同的節(jié)點(diǎn)上的,,也就是說(shuō),計(jì)算節(jié)點(diǎn)和存儲(chǔ)節(jié)點(diǎn)通常在一起,。這種配置允許框架在那些已經(jīng)存好數(shù)據(jù)的節(jié)點(diǎn)上高效地調(diào)度任務(wù),,這可以使整個(gè)集群的網(wǎng)絡(luò)帶寬被非常高效地利用,。
Map/Reduce框架由一個(gè)單獨(dú)的master JobTracker 和每個(gè)集群節(jié)點(diǎn)一個(gè)slave TaskTracker共同組成。master負(fù)責(zé)調(diào)度構(gòu)成一個(gè)作業(yè)的所有任務(wù),,這些任務(wù)分布在不同的slave上,master監(jiān)控它們的執(zhí)行,,重新執(zhí)行已經(jīng)失敗的任務(wù)。而slave僅負(fù)責(zé)執(zhí)行由master指派的任務(wù),。
應(yīng)用程序至少應(yīng)該指明輸入/輸出的位置(路徑),,并通過(guò)實(shí)現(xiàn)合適的接口或抽象類(lèi)提供map和reduce函數(shù),。再加上其他作業(yè)的參數(shù),,就構(gòu)成了作業(yè)配置(job configuration),。然后,Hadoop的 job client提交作業(yè)(jar包/可執(zhí)行程序等)和配置信息給JobTracker,,后者負(fù)責(zé)分發(fā)這些軟件和配置信息給slave、調(diào)度任務(wù)并監(jiān)控它們的執(zhí)行,,同時(shí)提供狀態(tài)和診斷信息給job-client。
雖然Hadoop框架是用Java實(shí)現(xiàn)的,,但Map/Reduce應(yīng)用程序則不一定要用 Java來(lái)寫(xiě) ,。
2.樣例分析:?jiǎn)卧~計(jì)數(shù)
1,、WordCount源碼分析
單詞計(jì)數(shù)是最簡(jiǎn)單也是最能體現(xiàn)MapReduce思想的程序之一,,該程序完整的代碼可以在Hadoop安裝包的src/examples目錄下找到
單詞計(jì)數(shù)主要完成的功能是:統(tǒng)計(jì)一系列文本文件中每個(gè)單詞出現(xiàn)的次數(shù),如圖所示:
(1)Map過(guò)程
Map過(guò)程需要繼承org.apache.hadoop.mapreduce包中的Mapper類(lèi),,并重寫(xiě)map方法
通過(guò)在map方法中添加兩句把key值和value值輸出到控制臺(tái)的代碼,可以發(fā)現(xiàn)map方法中的value值存儲(chǔ)的是文本文件中的一行(以回車(chē)符作為行結(jié)束標(biāo)記),,而key值為該行的首字符相對(duì)于文本文件的首地址的偏移量。然后StringTokenizer類(lèi)將每一行拆分成一個(gè)個(gè)的單詞,,并將<word,1>作為map方法的結(jié)果輸出,其余的工作都交由MapReduce框架處理,。其中IntWritable和Text類(lèi)是Hadoop對(duì)int和string類(lèi)的封裝,這些類(lèi)能夠被串行化,,以方便在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,。
TokenizerMapper的實(shí)現(xiàn)代碼如下:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { System.out.println('key = ' key.toString()); //添加查看key值 System.out.println('value = ' value.toString()); //添加查看value值 StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
(2)Reduce過(guò)程
Reduce過(guò)程需要繼承org.apache.hadoop.mapreduce包中的Reducer類(lèi),,并重寫(xiě)reduce方法
reduce方法的輸入?yún)?shù)key為單個(gè)單詞,,而values是由各Mapper上對(duì)應(yīng)單詞的計(jì)數(shù)值所組成的列表,,所以只要遍歷values并求和,,即可得到某個(gè)單詞的出現(xiàn)總次數(shù)
IntSumReduce類(lèi)的實(shí)現(xiàn)代碼如下:
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum = val.get(); } result.set(sum); context.write(key, result); } }
(3)執(zhí)行MapReduce任務(wù)
在MapReduce中,,由Job對(duì)象負(fù)責(zé)管理和運(yùn)行一個(gè)計(jì)算任務(wù),并通過(guò)Job的一些方法對(duì)任務(wù)的參數(shù)進(jìn)行相關(guān)的設(shè)置,。此處設(shè)置了使用TokenizerMapper完成Map過(guò)程和使用的IntSumReduce完成Combine和Reduce過(guò)程。還設(shè)置了Map過(guò)程和Reduce過(guò)程的輸出類(lèi)型:key的類(lèi)型為T(mén)ext,,value的類(lèi)型為IntWritable。任務(wù)的輸入和輸出路徑則由命令行參數(shù)指定,,并由FileInputFormat和FileOutputFormat分別設(shè)定,。完成相應(yīng)任務(wù)的參數(shù)設(shè)定后,即可調(diào)用job.waitForCompletion()方法執(zhí)行任務(wù),,主函數(shù)實(shí)現(xiàn)如下:
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println('Usage: wordcount <in> <out>'); System.exit(2); } Job job = new Job(conf, 'word count'); job.setJarByClass(wordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
運(yùn)行結(jié)果如下:
14/12/17 05:53:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2 14/12/17 05:53:26 INFO mapred.JobClient: Running job: job_local_0001 14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2 14/12/17 05:53:26 INFO mapred.MapTask: io.sort.mb = 100 14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720 14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680 key = 0 value = Hello World key = 12 value = Bye World 14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output 14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0 14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_m_000000_0′ done. 14/12/17 05:53:27 INFO mapred.MapTask: io.sort.mb = 100 14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720 14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680 14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output key = 0 value = Hello Hadoop key = 13 value = Bye Hadoop 14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0 14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_m_000001_0′ done. 14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.Merger: Merging 2 sorted segments 14/12/17 05:53:27 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 73 bytes 14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 14/12/17 05:53:27 INFO mapred.LocalJobRunner: 14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 14/12/17 05:53:27 INFO output.FileOutputCommitter: Saved output of task ‘a(chǎn)ttempt_local_0001_r_000000_0′ to out 14/12/17 05:53:27 INFO mapred.LocalJobRunner: reduce > reduce 14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_r_000000_0′ done. 14/12/17 05:53:27 INFO mapred.JobClient: map 100% reduce 100% 14/12/17 05:53:27 INFO mapred.JobClient: Job complete: job_local_0001 14/12/17 05:53:27 INFO mapred.JobClient: Counters: 14 14/12/17 05:53:27 INFO mapred.JobClient: FileSystemCounters 14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_READ=17886 14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_READ=52932 14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54239 14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=71431 14/12/17 05:53:27 INFO mapred.JobClient: Map-Reduce Framework 14/12/17 05:53:27 INFO mapred.JobClient: Reduce input groups=4 14/12/17 05:53:27 INFO mapred.JobClient: Combine output records=6 14/12/17 05:53:27 INFO mapred.JobClient: Map input records=4 14/12/17 05:53:27 INFO mapred.JobClient: Reduce shuffle bytes=0 14/12/17 05:53:27 INFO mapred.JobClient: Reduce output records=4 14/12/17 05:53:27 INFO mapred.JobClient: Spilled Records=12 14/12/17 05:53:27 INFO mapred.JobClient: Map output bytes=78 14/12/17 05:53:27 INFO mapred.JobClient: Combine input records=8 14/12/17 05:53:27 INFO mapred.JobClient: Map output records=8 14/12/17 05:53:27 INFO mapred.JobClient: Reduce input records=6
2,、WordCount處理過(guò)程
上面給出了WordCount的設(shè)計(jì)思路和源碼,但是沒(méi)有深入細(xì)節(jié),,下面對(duì)WordCount進(jìn)行更加詳細(xì)的分析:
(1)將文件拆分成splits,由于測(cè)試用的文件較小,,所以每一個(gè)文件為一個(gè)split,并將文件按行分割成<key, value>對(duì),,如圖,這一步由Mapreduce框架自動(dòng)完成,,其中偏移量包括了回車(chē)所占的字符
(2)將分割好的<key, value>對(duì)交給用戶(hù)定義的map方法進(jìn)行處理,,生成新的<key, value>對(duì)
(3)得到map方法輸出的<key, value>對(duì)后,Mapper會(huì)將它們按照key值進(jìn)行排序,,并執(zhí)行Combine過(guò)程,將key值相同的value值累加,,得到Mapper的最終輸出結(jié)果,,如圖:
(4)Reduce先對(duì)從Mapper接收的數(shù)據(jù)進(jìn)行排序,,再交由用戶(hù)自定義的reduce方法進(jìn)行處理,得到新的<key, value>對(duì),,并作為WordCount的輸出結(jié)果,如圖:
3.MapReduce,,你夠了解嗎,?
MapReduce框架在幕后默默地完成了很多的事情,,如果不重寫(xiě)map和reduce方法,會(huì)出現(xiàn)什么情況呢,?
下面來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)化的MapReduce,新建一個(gè)LazyMapReduce,,該類(lèi)只對(duì)任務(wù)進(jìn)行必要的初始化及輸入/輸出路徑的設(shè)置,,其余的參數(shù)均保持默認(rèn)
代碼如下:
public class LazyMapReduce { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println('Usage:wordcount<in><out>'); System.exit(2); } Job job = new Job(conf, 'LazyMapReduce'); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0:1); } }
運(yùn)行結(jié)果為:
14/12/17 23:04:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2 14/12/17 23:04:14 INFO mapred.JobClient: Running job: job_local_0001 14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2 14/12/17 23:04:14 INFO mapred.MapTask: io.sort.mb = 100 14/12/17 23:04:15 INFO mapred.JobClient: map 0% reduce 0% 14/12/17 23:04:18 INFO mapred.MapTask: data buffer = 79691776/99614720 14/12/17 23:04:18 INFO mapred.MapTask: record buffer = 262144/327680 14/12/17 23:04:18 INFO mapred.MapTask: Starting flush of map output 14/12/17 23:04:19 INFO mapred.MapTask: Finished spill 0 14/12/17 23:04:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 14/12/17 23:04:19 INFO mapred.LocalJobRunner: 14/12/17 23:04:19 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_m_000000_0′ done. 14/12/17 23:04:20 INFO mapred.MapTask: io.sort.mb = 100 14/12/17 23:04:20 INFO mapred.MapTask: data buffer = 79691776/99614720 14/12/17 23:04:20 INFO mapred.MapTask: record buffer = 262144/327680 14/12/17 23:04:20 INFO mapred.MapTask: Starting flush of map output 14/12/17 23:04:20 INFO mapred.MapTask: Finished spill 0 14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 14/12/17 23:04:20 INFO mapred.LocalJobRunner: 14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_m_000001_0′ done. 14/12/17 23:04:20 INFO mapred.LocalJobRunner: 14/12/17 23:04:20 INFO mapred.Merger: Merging 2 sorted segments 14/12/17 23:04:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 90 bytes 14/12/17 23:04:20 INFO mapred.LocalJobRunner: 14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 14/12/17 23:04:20 INFO mapred.LocalJobRunner: 14/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 14/12/17 23:04:20 INFO output.FileOutputCommitter: Saved output of task ‘a(chǎn)ttempt_local_0001_r_000000_0′ to out 14/12/17 23:04:20 INFO mapred.LocalJobRunner: reduce > reduce 14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_r_000000_0′ done. 14/12/17 23:04:20 INFO mapred.JobClient: map 100% reduce 100% 14/12/17 23:04:20 INFO mapred.JobClient: Job complete: job_local_0001 14/12/17 23:04:20 INFO mapred.JobClient: Counters: 14 14/12/17 23:04:20 INFO mapred.JobClient: FileSystemCounters 14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_READ=46040 14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_READ=51471 14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=52808 14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=98132 14/12/17 23:04:20 INFO mapred.JobClient: Map-Reduce Framework 14/12/17 23:04:20 INFO mapred.JobClient: Reduce input groups=3 14/12/17 23:04:20 INFO mapred.JobClient: Combine output records=0 14/12/17 23:04:20 INFO mapred.JobClient: Map input records=4 14/12/17 23:04:20 INFO mapred.JobClient: Reduce shuffle bytes=0 14/12/17 23:04:20 INFO mapred.JobClient: Reduce output records=4 14/12/17 23:04:20 INFO mapred.JobClient: Spilled Records=8 14/12/17 23:04:20 INFO mapred.JobClient: Map output bytes=78 14/12/17 23:04:20 INFO mapred.JobClient: Combine input records=0 14/12/17 23:04:20 INFO mapred.JobClient: Map output records=4 14/12/17 23:04:20 INFO mapred.JobClient: Reduce input records=4
可見(jiàn)在默認(rèn)情況下,MapReduce原封不動(dòng)地將輸入<key, value>寫(xiě)到輸出
下面介紹MapReduce的部分參數(shù)及其默認(rèn)設(shè)置:
(1)InputFormat類(lèi)
該類(lèi)的作用是將輸入的數(shù)據(jù)分割成一個(gè)個(gè)的split,并將split進(jìn)一步拆分成<key, value>對(duì)作為map函數(shù)的輸入
(2)Mapper類(lèi)
實(shí)現(xiàn)map函數(shù),,根據(jù)輸入的<key, value>對(duì)生產(chǎn)中間結(jié)果
(3)Combiner
實(shí)現(xiàn)combine函數(shù),合并中間結(jié)果中具有相同key值的鍵值對(duì),。
(4)Partitioner類(lèi)
實(shí)現(xiàn)getPartition函數(shù),,用于在Shuffle過(guò)程按照key值將中間數(shù)據(jù)分成R份,每一份由一個(gè)Reduce負(fù)責(zé)
(5)Reducer類(lèi)
實(shí)現(xiàn)reduce函數(shù),,將中間結(jié)果合并,,得到最終的結(jié)果
(6)OutputFormat類(lèi)
該類(lèi)負(fù)責(zé)輸出最終的結(jié)果
上面的代碼可以改寫(xiě)為:
public class LazyMapReduce { public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if(otherArgs.length != 2) { System.err.println('Usage:wordcount<in><out>'); System.exit(2); } Job job = new Job(conf, 'LazyMapReduce'); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(Mapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setPartitionerClass(HashPartitioner.class); job.setReducerClass(Reducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(FileOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)? 0:1); } }
不過(guò)由于版本問(wèn)題,,顯示有些類(lèi)已經(jīng)過(guò)時(shí)
|