• 技术文章 >数据库 >mysql教程

    MapReduce2.0处理机制

    2016-06-07 15:08:49原创558

    MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。 内部模型采用分而治之的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。 map和reduce的数据处理方式均采取键对的方式:即 [k1

    MapReduce(分布式计算模型)作为Hadoop家族一重要的家庭成员主要用于搜素领域,海量数据计算等问题。

    内部模型采用"分而治之"的思想。MapReduce分为两部分(Map和Reduce)。其中Shuffler是对Reduce的预处理。

    map和reduce的数据处理方式均采取键值对的方式:即 [k1,v1]->MAP->[K2,V2]->Reduce->[k3,v3]。

    MR执行流程
    (1).客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar ...)
    (2).JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
    (3).client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
    (4).开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
    (5).JobTracker进行初始化任务
    (6).读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
    (7).TaskTracker通过心跳机制领取任务(任务的描述信息)
    (8).下载所需的jar,配置文件等
    (9).TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
    (10).将结果写入到HDFS当中

    在hadoop2.0以上版本中JobTracker取名为RM(resourceManage) TastTracker取名为NM(nodeManage)

    mapReduce操作实现wordcount功能(即从文本中读取内容,计算出每个单词出现的次数)

    程序分为3个类(自定义MAP方法功能实现,自定义REDUCE方法功能实现,最后类拼凑成mapreduce模式导成jar包,在HDFS分布式功能中实现)

    1.WCMapper类(实现map)

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    /*
    * 给wordcount写mapper
    * 定义mapper
    * KEYIN:k1的类型
    * VALUEIN:v1的类型
    *
    * 重写map方法
    * hadoop没有使用jdk默认的序列化机制(long->longwriteable String->Text)
    */
    public class WCMapper extends Mapper {

    @Override
    protected void map(LongWritable key, Text value,
    Mapper.Context context)
    throws IOException, InterruptedException {
    // TODO Auto-generated method stub

    // 接收信息V1
    String line = value.toString();
    // 切分数据
    String[] words = line.split(" ");
    // 循环
    for (String w : words) {
    // 出现一次记一个1,输出
    // 构一个新的key,value
    context.write(new Text(w), new LongWritable(1));
    }
    }

    }

    2.WCReducer类实现reduce功能

    import java.io.IOException;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    /*
    * KEYIN k2的类型
    * VALUEIN v2的类型
    *
    * 重写reducer方法
    */
    public class WCReducer extends Reducer {

    @Override
    protected void reduce(Text k2, Iterable v2s,
    Reducer.Context context)
    throws IOException, InterruptedException {
    // 接收数据
    Text k3 = k2;
    // 定义一个计数器
    Long count = (long) 0;
    // 循环v2s
    for (LongWritable i : v2s) {
    count += i.get();
    }
    // 输出
    context.write(k3, new LongWritable(count));
    }

    }

    3.wordCount类。拼凑前两个类,符合mapreduce格式


    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    /*
    * mapReduce
    *
    * 组装自定义的map和reduce
    */
    public class wordCount {
    public static void main(String[] args) throws Exception {
    // Job job=Job.instance(new Configuration()); //版本hadoop2
    Job job = new Job(new Configuration()); // 版本hadoop1

    // 4.注意---将main方法中的类设进去
    job.setJarByClass(wordCount.class);

    // 1.设置自定义Mapper
    job.setMapperClass(WCMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);

    // 设置mapper读入的path(hdfs路径)
    FileInputFormat.setInputPaths(job, new Path("/words.txt"));

    // 2.设置reduce
    job.setReducerClass(WCReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);

    FileOutputFormat.setOutputPath(job, new Path("/WcountResult"));

    // 3.提交
    job.waitForCompletion(true); // 打印进度和详情
    }
    }

    声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
    上一篇:Ubuntu 8.10系统JAVA和MYSQL配置方法 下一篇:oracle:变长数组varray,嵌套表,集合
    Web大前端开发直播班

    相关文章推荐

    • mysql怎么增加权限• mysql怎么删除unique约束• mysql视图与表的区别是什么• postgresql和mysql有什么区别• mysql字段类型有哪些

    全部评论我要评论

  • 取消发布评论发送
  • 1/1

    PHP中文网