当前位置: 新豪天地登录网址 > www.3559.com > 正文

mapreduce工作流程www.3559.com:

时间:2019-11-03 16:10来源:www.3559.com
聚类算法 ====================== MapReduce实现 ====================== 需求:统计规律切分的大规模文本中单词出现次数 文本中单词例子: hello world this myhh dd mm dd kk MapReduce适合PB级以上海量数据的

聚类算法

====================== MapReduce实现 ======================

需求:统计规律切分的大规模文本中单词出现次数

文本中单词例子:

hello   world   this    my
hh  dd  mm  dd  kk

MapReduce适合PB级以上海量数据的离线处理MapReduce不擅长什么

  reduce(k1, list(v1)) -> v2 

MapReduce

一、MapReduce架构组成

(主从架构)主要包含4个主要的组件:

》Client:将编写的MapReduce程序提交给JobTracker端。在Hadoop内部用“作业”(Job)表示MapReduce程序,每个作业会被分解成若干个Map/Reduce任务(Task)。

》JobTracker:主要负责资源监控和作业调度。监控所有TaskTracker与作业的健康状况。

》TaskTracker:TaskTracker会周期性地通过Heartbeat将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker发送过来的命令并执行相应的操作(如启动新任务、杀死任务等)。TaskTracker使用“slot”等量划分本节点上的资源量。“slot”代表计算资源(CPU、内存等)。一个Task获取到一个slot后才有机会运行,而Hadoop调度器的作用就是将各个TaskTracker上的空闲slot分配给Task使用。slot分为Map slot和Reduce slot两种,分别供Map Task和Reduce Task使用。TaskTracker通过slot数目(可配置参数)限定Task的并发度。

》Task:(任务)Task分为Map Task和Reduce Task两种,均由TaskTracker启动。

map task执行流程:

www.3559.com 1

*
*

reduce task执行流程:

*
*

www.3559.com 2

二、mapreduce的生命周期:(即作业提交到运行结束的整个流程)

www.3559.com 3

一共有5个步骤:

1、作业提交与初始化。JobClient。

2、任务调度与监控。JobTracker。

3、任务运行环境准备。即TaskTracker启动JVM和资源隔离。

4、任务执行。TaskTracker为Task准备好运行环境后,便会启动Task.

5、作业完成。所有Task执行完毕后,整个作业执行成功.

三、mapreduce编程接口体系结构:

mapreduce编程模型接口体系的结构如下图:

www.3559.com 4

整个编程模型位于用户应用程序层和MapReduce执行层之间。分为两层:

第一层是最基本的Java API,主要有5个编程组件:分别是InputFormat、Mapper、Partitioner、Reducer和OutputFormat。Hadoop自带了很多直接可用的InputFormat、Partitioner和OutputFormat,大部分情况下,用户只需编写Mapper和Reducer即可。

第二层是工具层,位于基本Java API之上,主要是为了方便用户编写复杂的MapReduce程序和利用其他编程语言增加MapReduce计算平台的兼容性而提出来的。在该层中,主要提供了4个编程工具包。               

❑JobControl:方便用户编写有依赖关系的作业,这些作业往往构成一个有向图,所以通常称为DAG(Directed Acyclic Graph)作业,如第2章中的朴素贝叶斯分类算法实现便是4个有依赖关系的作业构成的DAG。

❑ChainMapper/ChainReducer:方便用户编写链式作业,即在Map或者Reduce阶段存在多个Mapper,形式如下:[MAPPER REDUCER MAPPER*]

❑Hadoop Streaming:方便用户采用非Java语言编写作业,允许用户指定可执行文件或者脚本作为Mapper/Reducer。

❑Hadoop Pipes:专门为C/C 程序员编写MapReduce程序提供的工具包。

四、接口介绍:

1、InputFormat接口:

主要用于描述输入数据的格式。提供以下两个功能:

❑数据切分:按照某个策略将输入数据切分成若干个split,以便确定Map Task个数以及对应的split。

❑为Mapper提供输入数据:给定某个split,能将其解析成一个个key/value对。

2、OutputFormat接口:

主要用于描述输出数据的格式,它能够将用户提供的key/value对写入特定格式的文件中。

www.3559.com ,3、Partitioner接口:

对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reducer处理,它直接影响Reduce阶段的负载均衡。

4、Mapper Reducer:

封装了应用程序的数据处理逻辑。所有存储在底层分布式文件系统上的数据均要解释成key/value的形式,并交给Mapper/Reducer中的map/reduce函数处理,产生另外一些key/value。

五、非Java API的设计:

1、Hadoop Streaming:

为方便非Java用户编写MapReduce程序而设计的工具包。它允许用户将任何可执行文件或者脚本作为Mapper/Reducer。

Hadoop Streaming要求用户编写的Mapper/Reducer从标准输入中读取数据,并将结果写到标准数据中,这类似于Linux中的管道机制。

》实现原理:

Hadoop Streaming工具包实际上是一个使用Java编写的MapReduce作业。当用户使用可执行文件或者脚本文件充当Mapper或者Reducer时,Java端的Mapper或者Reducer充当了wrapper角色,它们将输入文件中的key和value直接传递给可执行文件或者脚本文件进行处理,并将处理结果写入HDFS。

六、Task运行过程分析:

当我们需要编写一个简单的mapreduce作业时,只需要实现map和reduce两个函数即可,然后将作业提交到集群上,Hadoop内部会将这两个函数封装到Map Task和Reduce Task中。为帮助更好的理解两个Task的实现原理,以下将从内部的实现原理来深入分析。Map Task:read 、map 、collect 、spill 、Combine;Reduce Task:shuffle、merge、sort、reduce、write。

Map Task的执行流程:

通过用户提供的InputFormat将对应的InputSplit解析成一系列的K/V,并以此交给map函数进行处理;

然后按照指定的partition对数据进行分片,确定相应的K/V交给哪个Reduce Task处理;

将数据交给用户定义的combine进行本地规约,最后讲处理结果保存在本地磁盘上。

Reduce Task的执行流程:

其输入来自各个Map Task。首先通过HTTP请求从各个Map Task上拷贝对应的数据分片,拷贝完后以key为关键字对所有数据进行排序,通过排序,key相同的记录聚集到一起形成若干分组,然后将每组数据交割reduce处理,最后将结果直接写入HDFS中。

如何正确使用

www.3559.com 5

MapReduce应用场景

  • 数据统计,如网站PV、UV统计
  • 搜索引擎的索引,比如百度就是用MapReduce来建立索引,MapReduce产生的需求背景就是Google创建搜索索引
  • 海量数据查找
  • 复杂的数据分析算法的实现比如聚类算法、分类算法、推荐算法等

MapReduce编程模型

Reduce阶段由一定数量的Reduce Task组成 

Java编程接口

  • 旧API:org.apache.hadoop.mapred,目前不建议使用
  • 新:org.apache.hadoop.mapreduce
  • 新API具有更好的扩展性,两种API内部执行引擎一样

Split与Block

  数据处理:Reducer

Map阶段

Map阶段由若干Map Task组成

  • 从HDFS获取输入数据并解析 InputFormat
  • 数据处理 Mapper
  • 数据分组:Partitioner

很多情况需自定义Partitioner

Map阶段由一定数量的Map Task组成

Reduce阶段

由若干Reduce Task组成,输入为Map Task的输出

  • 由Map的输出数据远程拷贝
  • 数据按照 key排序
  • 数据处理 Reducer
  • 数据输出:OutputFormat

MapReduce编程模型—外部物理结构

Map阶段 ------------->

YARN是Hadoop的一个资源调度框架,下面简单介绍下它的运行流程

www.3559.com 6

yarn运行流程.jpg

以上是YARN的一个流程图,简单介绍

  1. Client提交任务执行请求
  2. ResourceManager接收到执行请求后,向Client返回一个作业ID,ResourceManager选择一个NodeManager分配一个Container,此NodeManager启动一个ApplicationMaster(AM)
  3. AM计算需要的资源及数据,向ResourceManager注册并申请计算所需要的资源
  4. ResourceManager分配合适的NodeManager给ApplicationMaster,ApplicationMaster与NodeManager协调分配container,启动相应的Task
  5. Application监控任务的执行情况并与Client通信,报告程序运行状态(作业运行失败,重新执行/在其他节点执行)
  6. 程序结束,相关Container及ApplicationMaster释放

允许用户自定义

Partitioner

MapReduce编程模型

TextInputFormat

====================== MapReduce编程模型 ======================

思路:
  1. Map阶段每行数据按分隔符切分,单词作为key,1为value
  2. Reduce阶段:远程从Map阶段的输出结果copy数据,并进行整理排序,把相同的key合并,形成key对应一组value的数据集,即为reduce的输入,reduce针对每个key调用reduce方法,对value求和

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出

OutputFormat(默认TextOutputFormat)

MapReduce简介

  • MapReduce是一个分布式的批处理计算框架,源于Google在2014年12月发布的一篇关于分布式计算的论文,Google并未开源,MapReduce是根据论文思路开发的一套框架,可以说是Google MapReduce的克隆版
  • 一个MapReduce程序分为Map阶段和Reduce阶段
  • MapReduce的典型特性包括:易于编程(用户只需要在Map和Reduce的实现函数中编写业务逻辑,就可以实现分布式计算)、扩展性好(可根据资源灵活的执行及配置任务)、容错性高(集群中一个节点故障,该节点的任务会调度到其他节点执行)、适合海量数据的离线处理

任务间存在严重的负载倾斜

  数据输出格式:OutputFormat

WordCount

Block

  数据分组:Partitioner

分为Map和Reduce阶段两部分组成

Combiner(local reducer)

 

MapReduce的短板

  • 实时计算,实时计算讲究时效性,MapReduce适合大规模数据的计算,在时效性方面就
  • 流式计算,流式计算的输入数据是动态的,MapReduce的数据集是静态的,因而无法实现流式计算
  • DAG计算,DAG计算讲究多次迭代,多个作业之间存在很严重的依赖关系

数据本地性分类

====================== MapReduce编程模型 ======================

内部逻辑

以下为MapReduce简要的处理过程图

www.3559.com 7

mapreduce内部逻辑.jpg

  1. 对输入数据进行分片(Split),一般以默认的block进行split,也可以自定义
  • 文件分片的过程中,很大可能会出现跨行问题,MapReduce的解决方案是读取完整,跨block分到上一个split,下一split分这些
  • 在Split的时候,会将数据解析成key-value对,该默认实现为TextInputFormat,Key为行在文本中的偏移量,Value为行内容,如果一行被截断,则读取下一个的block的前几个字符,Split大小也可以自定义
  1. Mapper程序读取切分好的数据(通过InputFormat接口)
  2. Mapper程序对数据进行处理,按照key、value的形式进行整合
  3. Partitioner对Mapper输出的数据进行分区,决定由哪个Reduce进行处理
  • Partitioner的默认实现为HashPartitioner,具体做法大致是对key取Hash值,相对Reduce的数量取余,产生的数字即为Reduce Task号
  • Partitioner也允许自定义分区
  1. 对数据进行Shuffle和Sort,分配到对应的Reducer进行处理
  2. 结果输出

减少Map Task输出数据量(磁盘IO)

Input: 一系列key/value对

编程接口

责任编辑:

Mapper

编程:
  1. 编程采用Java进行,使用maven管理jar包,具体环境设置在以后有机会补充,目前请网上查看,资料也不少。需要注意的是由于众所周知的原因,请设置国内maven仓库镜像
    pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.nanri</groupId>
    <artifactId>bigdatalearn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

代码:具体见代码注释

package com.nanri.mapr01;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountApp {
//map阶段,输入key、value输出key、value类型
    static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //读取数据,将数据转换成字符串
            String line = value.toString();
            //拆分数据
            String[] words = line.split("t");
            for(String word : words) {
                //输出需要序列化写出
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        // 输入:k:v1, v2, v3
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定义一个计数器
            int count = 0;
            //遍历,将key出现的次数累加
            for(IntWritable value : values) {
                count  = value.get();
            }
            context.write(key, new IntWritable(count));
        }
    }

//执行程序
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //定义的job名称
        String jobName = args[0];
        //输入路径
        String inputPath = args[1];
        //输出路径,执行时此路径在hadoop上必须不存在
        String outputPath = args[2];
        //执行配置
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置作业名称
        job.setJobName(jobName);
        //设置具体执行类
        job.setJarByClass(WordCountApp.class);
        //mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //设置mapper阶段的输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reducer阶段的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        // 作业完成退出
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

代码编写完成后,进行打包,将编译好的工程jar包文件上传到服务器,运行hadoop jar bigdatalearn-1.0-SNAPSHOT.jar com.nanri.mapr01.WordCountApp wordcountapp /wordcount/input /wordcount/output,jar包后第一个参数为入口类所在的完整路径,之后参数为具体main方法中定义的参数,需确保输出路径不存在,稍等一会,即可看到给的跟踪地址及执行成功后的提示

  • Mapreduce用户编写的程序分为三个部分:Mapper、Reducer、Driver
  • Mapper的输入数据是K-V对的形式,key一般为行号,value为每行数据
  • Mapper的输入数据是K-V对的形式,
  • Mapper的业务逻辑写在map()方法中
  • map()方法对每个<K,V>执行一次
  • Reducer的输入数据类型对应Mapper的输出数据类型,也是K-V对
  • Reducetask进程对每一组相同k的<k,v>组调用一次reduce()方法
  • 用户自定义的Mapper和Reducer都要继承各自的父类
  • 整个程序需要一个Driver来提交,提交的是一个描述了各种必要信息的job对象

图算法返回搜狐,查看更多

Combiner(local reducer)

YARN

不能启用推测执行机制

  输入数据处理:Mapper

WordCount相当于普通编程语言的HelloWorld

Split与Block是对应关系是任意的,可由用户控制

  输入数据格式解析:InputFormat

什么是数据本地性(data locality)

Output:一系列(k2,v2)对

推测执行机制

Reducer

合并相同的key对应的value(wordcount例子)

  map(k,v) ->list(k1,v1) 

HDFS中最小的数据存储单位

  数据按照key排序

Reducer

用户提供两个函数实现: 

Map Task/Reduce Task

Reduce阶段 ------------->

数据输出格式:OutputFormat

  数据远程拷贝

将分片数据解析成key/value对

(k1,v1) 是中间key/value结果对

MapReduce编程模型—内部逻辑

InputFormat(默认TextInputFormat)

为拖后腿任务启动一个备份任务,同时运行

MapReduce将作业的整个运行过程分为两个阶段: Map阶段和Reduce阶段

一旦运行失败,由YARN的ResourceManager负责重新启动,最多重启次数可由用户设置,默认是2次。一旦超过最高重启次数,则作业运行失败。

作业完成时间取决于最慢的任务完成时间

数据分组:Partitioner

InputFormat

常见MapReduce应用场景

数据本地性

发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度

因硬件老化、软件Bug等,某些任务可能运行非常慢

通常与Reducer逻辑一样

功能类似于 1.0中的JobTracker,但不负责资源管理;

功能包括:任务划分、资源申请并将之二次分配个Map Task和Reduce Task、任务状态监控和容错。

默认是64MB

Partitioner

输入数据格式解析:InputFormat

减少Reduce-Map网络传输数据量(网络IO)

默认与Block一一对应

推测执行机制

文件分片(InputSplit)方法

MapReduce自身的设计特点决定了数据源必须是静态的

数据远程拷贝

MapReduce的输入数据集是静态的,不能动态变化

同机架(rack-local)

Map阶段

Combiner可做看local reducer

Reduce阶段由一定数量的Reduce Task组成

比如“hash(hostname(URL)) mod R”确保相同域名的网页交给同一个Reduce Task处理

MapReduce 2.0架构

默认实现是TextInputFormat

原标题:大数据——MapReduce

Client

简单的数据统计,比如网站pv、uv统计

数据按照key排序

如果任务运行在它将处理的数据所在的节点,则称该任务具有“数据本地性”

MRAppMaster容错性

默认实现:hash(key) mod R

同节点(node-local)

Map阶段由一定数量的Map Task组成

其他(off-switch)

Sum(YES!),Average(NO!)

若行被截断,则读取下一个block的前几个字符

www.3559.com 8

像MySQL一样,在毫秒级或者秒级内返回结果

Task周期性向MRAppMaster汇报心跳;

InputFormat(默认TextInputFormat)

一个作业由若干个Map任务和Reduce任务构成

数据处理:Reducer

特殊任务,比如任务向数据库中写数据

MapReduce中最小的计算单元

Partitioner

Combiner

推荐算法

与MapReduce 1.0的Client类似,用户通过Client与YARN交互,提交MapReduce作业,查询作业运行状态,管理作业等。

Block与Split

海量数据查找

Mapper

Partitioner决定了Map Task输出的每条数据交给哪个Reduce Task处理

www.3559.com 9

搜索引擎建索引 (mapreduce产生的原因)

复杂数据分析算法实现

输入数据处理:Mapper

DAG计算

OutputFormat(默认TextOutputFormat)

流式计算

好处

谁先运行完,则采用谁的结果

R是Reduce Task数目

处理跨行问题

一旦Task挂掉,则MRAppMaster将为之重新申请资源,并运行之。最多重新运行次数可由用户设置,默认4次。

www.3559.com 10

Map和Reduce两阶段

MRAppMaster

结果可叠加

MapReduce将作业job的整个运行过程分为两个阶段:Map阶段和Reduce阶段

Key是行在文件中的偏移量,value是行内容

Reduce阶段

实时计算

Spit

本地性可避免跨节点或机架数据传输,提高运行效率

分类算法

MapReduce 2.0容错性

编辑:www.3559.com 本文来源:mapreduce工作流程www.3559.com:

关键词: www.3559.com