久久国产成人av_抖音国产毛片_a片网站免费观看_A片无码播放手机在线观看,色五月在线观看,亚洲精品m在线观看,女人自慰的免费网址,悠悠在线观看精品视频,一级日本片免费的,亚洲精品久,国产精品成人久久久久久久

分享

MapReduce 實(shí)例淺析

 wiborgite 2016-11-11


來(lái)源:codingwu

鏈接:http://www.cnblogs.com/archimedes/p/mapreduce-example-analysis.html


在文章《MapReduce原理與設(shè)計(jì)思想》中,,詳細(xì)剖析了MapReduce的原理,這篇文章則通過(guò)實(shí)例重點(diǎn)剖析MapReduce


1.MapReduce概述


Hadoop Map/Reduce是一個(gè)使用簡(jiǎn)易的軟件框架,,基于它寫(xiě)出來(lái)的應(yīng)用程序能夠運(yùn)行在由上千個(gè)商用機(jī)器組成的大型集群上,并以一種可靠容錯(cuò)的方式并行處理上T級(jí)別的數(shù)據(jù)集,。


一個(gè)Map/Reduce 作業(yè)(job) 通常會(huì)把輸入的數(shù)據(jù)集切分為若干獨(dú)立的數(shù)據(jù)塊,由 map任務(wù)(task)以完全并行的方式處理它們,??蚣軙?huì)對(duì)map的輸出先進(jìn)行排序,, 然后把結(jié)果輸入給reduce任務(wù)。通常作業(yè)的輸入和輸出都會(huì)被存儲(chǔ)在文件系統(tǒng)中,。 整個(gè)框架負(fù)責(zé)任務(wù)的調(diào)度和監(jiān)控,,以及重新執(zhí)行已經(jīng)失敗的任務(wù),。


通常,Map/Reduce框架和分布式文件系統(tǒng)是運(yùn)行在一組相同的節(jié)點(diǎn)上的,,也就是說(shuō),計(jì)算節(jié)點(diǎn)和存儲(chǔ)節(jié)點(diǎn)通常在一起,。這種配置允許框架在那些已經(jīng)存好數(shù)據(jù)的節(jié)點(diǎn)上高效地調(diào)度任務(wù),,這可以使整個(gè)集群的網(wǎng)絡(luò)帶寬被非常高效地利用,。


Map/Reduce框架由一個(gè)單獨(dú)的master JobTracker 和每個(gè)集群節(jié)點(diǎn)一個(gè)slave TaskTracker共同組成。master負(fù)責(zé)調(diào)度構(gòu)成一個(gè)作業(yè)的所有任務(wù),,這些任務(wù)分布在不同的slave上,master監(jiān)控它們的執(zhí)行,,重新執(zhí)行已經(jīng)失敗的任務(wù)。而slave僅負(fù)責(zé)執(zhí)行由master指派的任務(wù),。


應(yīng)用程序至少應(yīng)該指明輸入/輸出的位置(路徑),,并通過(guò)實(shí)現(xiàn)合適的接口或抽象類(lèi)提供map和reduce函數(shù),。再加上其他作業(yè)的參數(shù),,就構(gòu)成了作業(yè)配置(job configuration),。然后,Hadoop的 job client提交作業(yè)(jar包/可執(zhí)行程序等)和配置信息給JobTracker,,后者負(fù)責(zé)分發(fā)這些軟件和配置信息給slave、調(diào)度任務(wù)并監(jiān)控它們的執(zhí)行,,同時(shí)提供狀態(tài)和診斷信息給job-client。


雖然Hadoop框架是用Java實(shí)現(xiàn)的,,但Map/Reduce應(yīng)用程序則不一定要用 Java來(lái)寫(xiě) ,。



2.樣例分析:?jiǎn)卧~計(jì)數(shù)


1,、WordCount源碼分析


單詞計(jì)數(shù)是最簡(jiǎn)單也是最能體現(xiàn)MapReduce思想的程序之一,,該程序完整的代碼可以在Hadoop安裝包的src/examples目錄下找到


單詞計(jì)數(shù)主要完成的功能是:統(tǒng)計(jì)一系列文本文件中每個(gè)單詞出現(xiàn)的次數(shù),如圖所示:



(1)Map過(guò)程


Map過(guò)程需要繼承org.apache.hadoop.mapreduce包中的Mapper類(lèi),,并重寫(xiě)map方法


通過(guò)在map方法中添加兩句把key值和value值輸出到控制臺(tái)的代碼,可以發(fā)現(xiàn)map方法中的value值存儲(chǔ)的是文本文件中的一行(以回車(chē)符作為行結(jié)束標(biāo)記),,而key值為該行的首字符相對(duì)于文本文件的首地址的偏移量。然后StringTokenizer類(lèi)將每一行拆分成一個(gè)個(gè)的單詞,,并將<word,1>作為map方法的結(jié)果輸出,其余的工作都交由MapReduce框架處理,。其中IntWritable和Text類(lèi)是Hadoop對(duì)int和string類(lèi)的封裝,這些類(lèi)能夠被串行化,,以方便在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,。


TokenizerMapper的實(shí)現(xiàn)代碼如下:


public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);

    private Text word = new Text();

       

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        System.out.println('key = ' key.toString());

//添加查看key值

        System.out.println('value = ' value.toString());

//添加查看value值

        StringTokenizer itr = new StringTokenizer(value.toString());

        while (itr.hasMoreTokens()) {

            word.set(itr.nextToken());

            context.write(word, one);

        }

    }

}


(2)Reduce過(guò)程


Reduce過(guò)程需要繼承org.apache.hadoop.mapreduce包中的Reducer類(lèi),,并重寫(xiě)reduce方法


reduce方法的輸入?yún)?shù)key為單個(gè)單詞,,而values是由各Mapper上對(duì)應(yīng)單詞的計(jì)數(shù)值所組成的列表,,所以只要遍歷values并求和,,即可得到某個(gè)單詞的出現(xiàn)總次數(shù)


IntSumReduce類(lèi)的實(shí)現(xiàn)代碼如下:


public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result = new IntWritable();

 

    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

      int sum = 0;

      for (IntWritable val : values) {

          sum = val.get();

      }

      result.set(sum);

      context.write(key, result);

   }

}


(3)執(zhí)行MapReduce任務(wù)


在MapReduce中,,由Job對(duì)象負(fù)責(zé)管理和運(yùn)行一個(gè)計(jì)算任務(wù),并通過(guò)Job的一些方法對(duì)任務(wù)的參數(shù)進(jìn)行相關(guān)的設(shè)置,。此處設(shè)置了使用TokenizerMapper完成Map過(guò)程和使用的IntSumReduce完成Combine和Reduce過(guò)程。還設(shè)置了Map過(guò)程和Reduce過(guò)程的輸出類(lèi)型:key的類(lèi)型為T(mén)ext,,value的類(lèi)型為IntWritable。任務(wù)的輸入和輸出路徑則由命令行參數(shù)指定,,并由FileInputFormat和FileOutputFormat分別設(shè)定,。完成相應(yīng)任務(wù)的參數(shù)設(shè)定后,即可調(diào)用job.waitForCompletion()方法執(zhí)行任務(wù),,主函數(shù)實(shí)現(xiàn)如下:


public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println('Usage: wordcount <in> <out>');

      System.exit(2);

    }

    Job job = new Job(conf, 'word count');

    job.setJarByClass(wordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }

}


運(yùn)行結(jié)果如下:


14/12/17 05:53:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=

14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2

14/12/17 05:53:26 INFO mapred.JobClient: Running job: job_local_0001

14/12/17 05:53:26 INFO input.FileInputFormat: Total input paths to process : 2

14/12/17 05:53:26 INFO mapred.MapTask: io.sort.mb = 100

14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720

14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680

key = 0

value = Hello World

key = 12

value = Bye World

14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output

14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0

14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_m_000000_0′ done.

14/12/17 05:53:27 INFO mapred.MapTask: io.sort.mb = 100

14/12/17 05:53:27 INFO mapred.MapTask: data buffer = 79691776/99614720

14/12/17 05:53:27 INFO mapred.MapTask: record buffer = 262144/327680

14/12/17 05:53:27 INFO mapred.MapTask: Starting flush of map output

key = 0

value = Hello Hadoop

key = 13

value = Bye Hadoop

14/12/17 05:53:27 INFO mapred.MapTask: Finished spill 0

14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_m_000001_0′ done.

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.Merger: Merging 2 sorted segments

14/12/17 05:53:27 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 73 bytes

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

14/12/17 05:53:27 INFO mapred.LocalJobRunner: 

14/12/17 05:53:27 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now

14/12/17 05:53:27 INFO output.FileOutputCommitter: Saved output of task ‘a(chǎn)ttempt_local_0001_r_000000_0′ to out

14/12/17 05:53:27 INFO mapred.LocalJobRunner: reduce > reduce

14/12/17 05:53:27 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_r_000000_0′ done.

14/12/17 05:53:27 INFO mapred.JobClient: map 100% reduce 100%

14/12/17 05:53:27 INFO mapred.JobClient: Job complete: job_local_0001

14/12/17 05:53:27 INFO mapred.JobClient: Counters: 14

14/12/17 05:53:27 INFO mapred.JobClient: FileSystemCounters

14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_READ=17886

14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_READ=52932

14/12/17 05:53:27 INFO mapred.JobClient: FILE_BYTES_WRITTEN=54239

14/12/17 05:53:27 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=71431

14/12/17 05:53:27 INFO mapred.JobClient: Map-Reduce Framework

14/12/17 05:53:27 INFO mapred.JobClient: Reduce input groups=4

14/12/17 05:53:27 INFO mapred.JobClient: Combine output records=6

14/12/17 05:53:27 INFO mapred.JobClient: Map input records=4

14/12/17 05:53:27 INFO mapred.JobClient: Reduce shuffle bytes=0

14/12/17 05:53:27 INFO mapred.JobClient: Reduce output records=4

14/12/17 05:53:27 INFO mapred.JobClient: Spilled Records=12

14/12/17 05:53:27 INFO mapred.JobClient: Map output bytes=78

14/12/17 05:53:27 INFO mapred.JobClient: Combine input records=8

14/12/17 05:53:27 INFO mapred.JobClient: Map output records=8

14/12/17 05:53:27 INFO mapred.JobClient: Reduce input records=6


2,、WordCount處理過(guò)程


上面給出了WordCount的設(shè)計(jì)思路和源碼,但是沒(méi)有深入細(xì)節(jié),,下面對(duì)WordCount進(jìn)行更加詳細(xì)的分析:


(1)將文件拆分成splits,由于測(cè)試用的文件較小,,所以每一個(gè)文件為一個(gè)split,并將文件按行分割成<key, value>對(duì),,如圖,這一步由Mapreduce框架自動(dòng)完成,,其中偏移量包括了回車(chē)所占的字符


(2)將分割好的<key, value>對(duì)交給用戶(hù)定義的map方法進(jìn)行處理,,生成新的<key, value>對(duì)


(3)得到map方法輸出的<key, value>對(duì)后,Mapper會(huì)將它們按照key值進(jìn)行排序,,并執(zhí)行Combine過(guò)程,將key值相同的value值累加,,得到Mapper的最終輸出結(jié)果,,如圖:



(4)Reduce先對(duì)從Mapper接收的數(shù)據(jù)進(jìn)行排序,,再交由用戶(hù)自定義的reduce方法進(jìn)行處理,得到新的<key, value>對(duì),,并作為WordCount的輸出結(jié)果,如圖:



3.MapReduce,,你夠了解嗎,?


MapReduce框架在幕后默默地完成了很多的事情,,如果不重寫(xiě)map和reduce方法,會(huì)出現(xiàn)什么情況呢,?


下面來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)化的MapReduce,新建一個(gè)LazyMapReduce,,該類(lèi)只對(duì)任務(wù)進(jìn)行必要的初始化及輸入/輸出路徑的設(shè)置,,其余的參數(shù)均保持默認(rèn)


代碼如下:


public class LazyMapReduce {

    public static void main(String[] args) throws Exception {

        

// TODO Auto-generated method stub

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if(otherArgs.length != 2) {

            System.err.println('Usage:wordcount<in><out>');

            System.exit(2);

        }

        Job job = new Job(conf, 'LazyMapReduce');

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 0:1);

    }

}


運(yùn)行結(jié)果為:


14/12/17 23:04:13 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=

14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2

14/12/17 23:04:14 INFO mapred.JobClient: Running job: job_local_0001

14/12/17 23:04:14 INFO input.FileInputFormat: Total input paths to process : 2

14/12/17 23:04:14 INFO mapred.MapTask: io.sort.mb = 100

14/12/17 23:04:15 INFO mapred.JobClient: map 0% reduce 0%

14/12/17 23:04:18 INFO mapred.MapTask: data buffer = 79691776/99614720

14/12/17 23:04:18 INFO mapred.MapTask: record buffer = 262144/327680

14/12/17 23:04:18 INFO mapred.MapTask: Starting flush of map output

14/12/17 23:04:19 INFO mapred.MapTask: Finished spill 0

14/12/17 23:04:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

14/12/17 23:04:19 INFO mapred.LocalJobRunner: 

14/12/17 23:04:19 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_m_000000_0′ done.

14/12/17 23:04:20 INFO mapred.MapTask: io.sort.mb = 100

14/12/17 23:04:20 INFO mapred.MapTask: data buffer = 79691776/99614720

14/12/17 23:04:20 INFO mapred.MapTask: record buffer = 262144/327680

14/12/17 23:04:20 INFO mapred.MapTask: Starting flush of map output

14/12/17 23:04:20 INFO mapred.MapTask: Finished spill 0

14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting

14/12/17 23:04:20 INFO mapred.LocalJobRunner: 

14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_m_000001_0′ done.

14/12/17 23:04:20 INFO mapred.LocalJobRunner: 

14/12/17 23:04:20 INFO mapred.Merger: Merging 2 sorted segments

14/12/17 23:04:20 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 90 bytes

14/12/17 23:04:20 INFO mapred.LocalJobRunner: 

14/12/17 23:04:20 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

14/12/17 23:04:20 INFO mapred.LocalJobRunner: 

14/12/17 23:04:20 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now

14/12/17 23:04:20 INFO output.FileOutputCommitter: Saved output of task ‘a(chǎn)ttempt_local_0001_r_000000_0′ to out

14/12/17 23:04:20 INFO mapred.LocalJobRunner: reduce > reduce

14/12/17 23:04:20 INFO mapred.TaskRunner: Task ‘a(chǎn)ttempt_local_0001_r_000000_0′ done.

14/12/17 23:04:20 INFO mapred.JobClient: map 100% reduce 100%

14/12/17 23:04:20 INFO mapred.JobClient: Job complete: job_local_0001

14/12/17 23:04:20 INFO mapred.JobClient: Counters: 14

14/12/17 23:04:20 INFO mapred.JobClient: FileSystemCounters

14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_READ=46040

14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_READ=51471

14/12/17 23:04:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=52808

14/12/17 23:04:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=98132

14/12/17 23:04:20 INFO mapred.JobClient: Map-Reduce Framework

14/12/17 23:04:20 INFO mapred.JobClient: Reduce input groups=3

14/12/17 23:04:20 INFO mapred.JobClient: Combine output records=0

14/12/17 23:04:20 INFO mapred.JobClient: Map input records=4

14/12/17 23:04:20 INFO mapred.JobClient: Reduce shuffle bytes=0

14/12/17 23:04:20 INFO mapred.JobClient: Reduce output records=4

14/12/17 23:04:20 INFO mapred.JobClient: Spilled Records=8

14/12/17 23:04:20 INFO mapred.JobClient: Map output bytes=78

14/12/17 23:04:20 INFO mapred.JobClient: Combine input records=0

14/12/17 23:04:20 INFO mapred.JobClient: Map output records=4

14/12/17 23:04:20 INFO mapred.JobClient: Reduce input records=4


可見(jiàn)在默認(rèn)情況下,MapReduce原封不動(dòng)地將輸入<key, value>寫(xiě)到輸出


下面介紹MapReduce的部分參數(shù)及其默認(rèn)設(shè)置:


(1)InputFormat類(lèi)


該類(lèi)的作用是將輸入的數(shù)據(jù)分割成一個(gè)個(gè)的split,并將split進(jìn)一步拆分成<key, value>對(duì)作為map函數(shù)的輸入


(2)Mapper類(lèi)


實(shí)現(xiàn)map函數(shù),,根據(jù)輸入的<key, value>對(duì)生產(chǎn)中間結(jié)果


(3)Combiner


實(shí)現(xiàn)combine函數(shù),合并中間結(jié)果中具有相同key值的鍵值對(duì),。


(4)Partitioner類(lèi)


實(shí)現(xiàn)getPartition函數(shù),,用于在Shuffle過(guò)程按照key值將中間數(shù)據(jù)分成R份,每一份由一個(gè)Reduce負(fù)責(zé)


(5)Reducer類(lèi)


實(shí)現(xiàn)reduce函數(shù),,將中間結(jié)果合并,,得到最終的結(jié)果


(6)OutputFormat類(lèi)


該類(lèi)負(fù)責(zé)輸出最終的結(jié)果


上面的代碼可以改寫(xiě)為:


public class LazyMapReduce {

    public static void main(String[] args) throws Exception {

        

// TODO Auto-generated method stub

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if(otherArgs.length != 2) {

            System.err.println('Usage:wordcount<in><out>');

            System.exit(2);

        }

        Job job = new Job(conf, 'LazyMapReduce');

        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(Mapper.class);

         

        job.setMapOutputKeyClass(LongWritable.class);

        job.setMapOutputValueClass(Text.class);

        job.setPartitionerClass(HashPartitioner.class);

        job.setReducerClass(Reducer.class);

         

        job.setOutputKeyClass(LongWritable.class);

        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(FileOutputFormat.class);

         

        FileInputFormat.addInputPath(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true)? 0:1);

    }

}


不過(guò)由于版本問(wèn)題,,顯示有些類(lèi)已經(jīng)過(guò)時(shí)



    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,,所有內(nèi)容均由用戶(hù)發(fā)布,不代表本站觀(guān)點(diǎn),。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,,謹(jǐn)防詐騙,。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào),。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶(hù) 評(píng)論公約

    類(lèi)似文章 更多