Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

1、从Windows向Yarn上提交源码(wordcount举例)

1)、步骤

①、driver文件添加必要配置信息

/**
 * 在给定的文本文件中统计输出每一个单词出现的总次数
 */
public class WcDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取配置信息以及封装任务
        Configuration configuration = new Configuration();


        configuration.set("fs.defaultFS", "hdfs://test:9000");
        //将程序提交到yarn上
        configuration.set("mapreduce.framework.name", "yarn");
        //允许从windows向linux提交任务
        configuration.set("mapreduce.app-submission.cross-platform", "true");
        //告知resourcemanager所在节点的位置
        configuration.set("yarn.resourcemanager.hostname", "test");

        //1、获取Job实例
        Job job = Job.getInstance(configuration);


        //2、设置Jar包
        //job.setJarByClass(WcDriver.class);

        job.setJar("D:\\DATA\\IdeaProjects\\newguigu\\target\\newguigu-1.0-SNAPSHOT.jar");

        //3、设置Mapper和Reducer
        job.setMapperClass(WcMapper.class);
        job.setReducerClass(WcReduce.class);

        //4、设置Mapper和Reducer的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //5、设置输入输出文件
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //6、提交Job

        boolean b = job.waitForCompletion(true);

        System.exit(b ? 0 : 1);
    }
}

②、编辑任务配置

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化
Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

③、打包,并将Jar包设置到Driver中

​ ①中已经给出填写jar包地址的地方,打包方法如下图

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

2)、关于从Windows向Yarn上提交源码错误记录

①、Redirecting to job history server…Client: Retrying connect to server:

​ 连接job history server失败,输入jps,看有哪些java进程,没有启动job history service,启动命令如下:

sbin/mr-jobhistory-daemon.sh start historyserver

②、映射的问题

​ 大家一定要在windows上配置好虚拟机的映射,路径为:C:\Windows\System32\drivers\etc,hosts文件,打开不能修改,这边附上修改的链接:https://www.jb51.net/os/win10/395409.html

2、Hadoop数据压缩

1)、压缩策略和原则

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

2)、MR支持的压缩编码

压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改

①、编码/解码器

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

②、压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

③、压缩编码介绍

​ 由于目前常用Snappy、LZO,所以就给出这两种的介绍

a、LZO

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

b、Snappy

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

④、压缩位置选择

​ 压缩可以在MapReduce作用的任意阶段启用

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

⑤、压缩参数配置

参数 默认值 阶段 建议
io.compression.codecs (在core-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec 输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress(在mapred-site.xml中配置) false mapper输出 这个参数设为true启用压缩
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec mapper输出 企业多使用LZO或Snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) false reducer输出 这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress. DefaultCodec reducer输出 使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) RECORD reducer输出 SequenceFile输出使用的压缩类型:NONE和BLOCK

3)、压缩实例

①、数据流的压缩和解压缩

a、TestCompression.java
public class TestCompression {
    public static void main(String[] args) throws IOException {
        //compress("d:/DATA/input/word.txt",BZip2Codec.class);
        decompress("d:/DATA/input/word.txt.bz2");
    }

    /**
     * 解压缩
     *
     * @param file
     */
    private static void decompress(String file) throws IOException {
        Configuration configuration = new Configuration();
        //生成压缩格式工厂对象
        CompressionCodecFactory codecFactory = new CompressionCodecFactory(configuration);

        //根据压缩格式工厂获取压缩对象
        CompressionCodec codec = codecFactory.getCodec(new Path(file));

        //输入流
        FileSystem fileSystem = FileSystem.get(configuration);

        FSDataInputStream fsDataInputStream = fileSystem.open(new Path(file));
        CompressionInputStream cis = codec.createInputStream(fsDataInputStream);

        //输出流:这边是为了获取文件名字,比如word.txt.gz要获取word.txt
        String outputFile = file.substring(0, file.length() - codec.getDefaultExtension().length());

        FSDataOutputStream fos = fileSystem.create(new Path(outputFile));

        IOUtils.copyBytes(cis, fos, 1024, false);
        IOUtils.closeStream(cis);
        IOUtils.closeStream(fos);
    }

    /**
     * 压缩
     *
     * @param file
     * @param codecClass
     */
    private static void compress(String file, Class<? extends CompressionCodec> codecClass) throws IOException {
        Configuration configuration = new Configuration();
        FileSystem fileSystem = FileSystem.get(configuration);

        //生成压缩格式对象
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, configuration);

        //开输入流
        FSDataInputStream fis = fileSystem.open(new Path(file));

        //输出流
        FSDataOutputStream fos = fileSystem.create(new Path(file + codec.getDefaultExtension()));

        //用压缩格式包装输出流
        CompressionOutputStream cos = codec.createOutputStream(fos);

        IOUtils.copyBytes(fis, cos, 1024);
        IOUtils.closeStream(fis);
        IOUtils.closeStream(fos);
    }
}

3、yarn的架构

1)、概述

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

①、假设我们把资源看成一个水池,3个节点每个节点上有NodeManager,假设每个NM有8G的水,这个池子总共24G

②、这个时候来个job,job首先会向ResourceManager提交,之后RM就会执行这个job

③、假设RM会在一个节点上拿出2G运行一个App Mstr,那么这个节点就还剩6G,这个AM会在一个节点上运行maptask(假设1G),那这个节点就剩7G,会在另一个节点上运行reducetask(假设1G),具体运行的过程由RM说了算

④、都执行完之后,资源回收,各个节点又恢复8G

补充:container的生成和关闭是由NodeManager管的,container里面运行什么是由AM说了算,AM是由RM启动的,AM运行什么由RM说了算。container相当于资源申请的单位。

提问:为什么container不能运行maptask和reducetask?分大点也不行嘛?

答:肯定不能。RM、NM只负责调度资源,通过容器(container)去调度,具体在资源里面干什么是由App Mstr负责,根据提交的程序不同会有不同的AM,这里我们提交的都是MapReduce,所以叫做MapRecude AppMstr。

2)、yarn工作机制

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

①、客户端提交任务,首先连接yarn集群

②、向集群申请一个Application以及资源的提交路径

③、会向临时目录提交必要的文件

④、提交完成后,向ResourceManager发起资源申请,申请执行AppMaster(这里拿MRAppMaster举例)

⑤、ResourceManager将用户的请求初始化为Task

⑥、ResourceManager将这个Task放在调度队列(决定哪些资源申请优先响应,哪些资源申请往后响应)里进行调度

⑦、假设轮到调度队列内的Task执行了,yarn就会让其中一个NodeManager运行一个容器(Container),容器里面运行AppMaster

⑧、AppMaster会把job资源下到本地

⑨、AppMaster进一步向yarn申请资源,这个资源也会被放在调度队列里

⑩、假设这个资源执行了,ResourceManager也会让NodeManager运行相应的容器

⑩①、此时AppMaster让这个资源起的容器跑MapTask,生成对应文件

⑩②、AppMaster进一步向yarn申请资源执行ReduceTask(过程同MapTask)

⑩③、Reduce向对应的Map获取相应的分区的数据

⑩④、执行完后,回收资源,MRAppMaster会向ResourceManager注销自己

3)、yarn的资源调度器

①、先进先出调度器(FIFO)

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

②、容量调度器(Capacity Scheduler)

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

③、公平调度器(Fair Scheduler)

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化
Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

4)、任务的推测执行

①、提问

​ 系统中有99%的Map任务都完成了,只有少数几个Map老是进度很慢,完不成,怎么办?

②、回答

​ 发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。

③、具体实施

(1)每个Task只能有一个备份任务

(2)当前Job已完成的Task必须不小于0.05(5%)

(3)开启推测执行参数设置。mapred-site.xml文件中默认是打开的

<property>
  	<name>mapreduce.map.speculative</name>
  	<value>true</value>
  	<description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property>

<property>
  	<name>mapreduce.reduce.speculative</name>
  	<value>true</value>
  	<description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>

④、算法原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WCvJ68B0-1609929592988)(file:///C:\Users\XIAOYO~1\AppData\Local\Temp\ksohtml13040\wps1.png)]Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

⑤、不能启用推测执行机制情况

(1)任务间存在严重的负载倾斜;

(2)特殊任务,比如任务向数据库中写数据。

4、Hadoop企业优化

1)、MapReduce优化方法

①、数据输入

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

②、Map阶段

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

③、Reduce阶段

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

④、I/O传输

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

⑤、数据倾斜问题

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化
Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

⑥、常用的调优参数

a.资源相关参数
(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml)
配置参数 参数说明
mapreduce.map.memory.mb 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.reduce.memory.mb 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.map.cpu.vcores 每个MapTask可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.cpu.vcores 每个ReduceTask可使用的最多cpu core数目,默认值: 1
mapreduce.reduce.shuffle.parallelcopies 每个Reduce去Map中取数据的并行数。默认值是5
mapreduce.reduce.shuffle.merge.percent Buffer中的数据达到多少比例开始写入磁盘。默认值0.66
mapreduce.reduce.shuffle.input.buffer.percent Buffer大小占Reduce可用内存的比例。默认值0.7
mapreduce.reduce.input.buffer.percent 指定多少比例的内存用来存放Buffer中的数据,默认值是0.0
(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml)
配置参数 参数说明
yarn.scheduler.minimum-allocation-mb 给应用程序Container分配的最小内存,默认值:1024
yarn.scheduler.maximum-allocation-mb 给应用程序Container分配的最大内存,默认值:8192
yarn.scheduler.minimum-allocation-vcores 每个Container申请的最小CPU核数,默认值:1
yarn.scheduler.maximum-allocation-vcores 每个Container申请的最大CPU核数,默认值:32
yarn.nodemanager.resource.memory-mb 给Containers分配的最大物理内存,默认值:8192
(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml)
配置参数 参数说明
mapreduce.task.io.sort.mb Shuffle的环形缓冲区大小,默认100m
mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认80%
b.容错相关参数(MapReduce性能优化)
配置参数 参数说明
mapreduce.map.maxattempts 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.reduce.maxattempts 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.task.timeout Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

2)、HDFS小文件优化方法

①、小文件的弊端

​ HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。

②、小文件的解决方案

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

5、TopN案例

1)、需求

​ 对手机流量输出结果进行加工,输出流量使用量在前10的用户信息

2)、需求分析

Hadoop学习视频心得(七)数据压缩、yarn、hadoop优化

3)、代码实现

①、FlowBean.java

public class FlowBean implements WritableComparable<FlowBean> {
    private String phone;

    public String getPhone() {
        return phone;
    }

    public void setPhone(String phone) {
        this.phone = phone;
    }

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    @Override
    public String toString() {
        return phone + "\t" + upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    /**
     * 序列化:将对象数据写到框架指定的地方
     *
     * @param dataOutput 数据的容器
     * @throws IOException
     */
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
        dataOutput.writeUTF(phone);
    }

    /**
     * 反序列化:从框架指定的地方读取数据填充对象
     *
     * @param dataInput 数据的容器
     * @throws IOException
     */
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
        this.phone = dataInput.readUTF();
    }

    /**
     * 比较方法,按照总流量的降序进行排序
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(FlowBean o) {
        /*if(this.sumFlow<o.sumFlow){
            return 1;
        }else if(this.sumFlow==o.sumFlow){
            return 0;
        }else{
            return 1;
        }*/

        return Long.compare(o.sumFlow, this.sumFlow);
    }
}

②、TopNMapper.java

/**
 * 将数据封装成FlowBean
 */
public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {

    private FlowBean flow = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fileds = value.toString().split("\t");
        flow.setPhone(fileds[0]);
        //上行、下行
        flow.set(Long.parseLong(fileds[1]), Long.parseLong(fileds[2]));

        context.write(flow, NullWritable.get());

    }
}

③、TopNReducer.java

/**
 * 取全局前十
 */
public class TopNReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> {

    /**
     * 取前十
     *
     * @param key
     * @param values  所有的数据一组进来,按照降序排列
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<NullWritable> iterator = values.iterator();
        //取前十
        for (int i = 0; i < 10; i++) {
            if (iterator.hasNext()) {
                context.write(key, iterator.next());
            }
        }
    }
}

④、TopNComparator.java

public class TopNComparator extends WritableComparator {

    protected TopNComparator() {
        super(FlowBean.class, true);
    }

    /**
     * 将所有的数据分到同一组
     *
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        return 0;
    }
}

⑤、TopNDriver.java

public class TopNDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(TopNDriver.class);
        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //启用Combiner求MapTask局部前十,优化map输出的数据量
        job.setCombinerKeyGroupingComparatorClass(TopNComparator.class);
        job.setCombinerClass(TopNReducer.class);

        job.setGroupingComparatorClass(TopNComparator.class);

        job.setOutputKeyClass(FlowBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("d:/DATA/input"));
        FileOutputFormat.setOutputPath(job, new Path("d:/DATA/output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

⑥、待处理文件(top10input.txt)

13470253144	180	180	360
13509468723	7335	110349	117684
13560439638	918	4938	5856
13568436656	3597	25635	29232
13590439668	1116	954	2070
13630577991	6960	690	7650
13682846555	1938	2910	4848
13729199489	240	0	240
13736230513	2481	24681	27162
13768778790	120	120	240
13846544121	264	0	264
13956435636	132	1512	1644
13966251146	240	0	240
13975057813	11058	48243	59301
13992314666	3008	3720	6728
15043685818	3659	3538	7197
15910133277	3156	2936	6092
15959002129	1938	180	2118
18271575951	1527	2106	3633
18390173782	9531	2412	11943
84188413	4116	1432	5548

★★★错误记录★★★

1)、执行本地处理压缩文件(bz2)

压缩没有任何问题,解压有问题

WARN org.apache.hadoop.io.compress.bzip2.Bzip2Factory - Failed to load/initialize native-bzip2 library system-native, will use pure-Java version

大概意思就是:警告org.apache.hadoop下载.io.compress.bzip2文件,Bzip2Factory未能加载/初始化本机bzip2库系统本机,将使用纯Java版本,所以宣布失败

上一篇:大数据技术之Hadoop(Hadoop数据压缩&Yarn资源调度器)四&五


下一篇:Swift之父Chris Lattner将从Apple离职,加入特斯拉