MR基础案例(五)分组最大值

在上一次二次排序的基础上,
求每组的最大值

20      21//取

50      51
50      52
50      53
50      54//取

60      51
60      52
60      53
60      56
60      57
60      61//取

70      54
70      55
70      56
70      57
70      58
70      58//取

需要将输入分组:

MyGroupCompartor.java

package MR;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定义分组比较器:
 * 1、需要实现RawComparator
 * 2、该类需要实现两个比较方法,一个是对象比较、另外一个是字节比较
 * 3、字节比较方法需要注意:字节长度需要和数据类型中的对应的属性类型一致。long=8 int=4
 * 4、如果key是对象,则默认使用对象中的第一个属性进行分组。
 * @author lyd
 *
 */
public class MyGroupCompartor implements RawComparator<SecondarySortWritable>{



	/**
	 * 字节的比较
	 */
	@Override
	public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
		//return WritableComparator.compareBytes(b1, s1, 2, b2, s2, 2);
		return 0;
	}

	/**
	 * 对象比较,自定义数据类型没有实现,可以再这实现
	 */
	@Override
	public int compare(SecondarySortWritable o1, SecondarySortWritable o2) {
		return o1.getFirst() - o2.getFirst();
	}
}

上一次二次排序,稍作变动即可

package MR;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;


/**
 * 分组最大值
 */
public class SortSecondaryMaxDemo implements Tool {
    /**
     * map阶段
     * @author lyd
     *
     */
    public static class MyMapper extends Mapper<LongWritable, Text, SecondarySortWritable, IntWritable> {

        SecondarySortWritable ss = new SecondarySortWritable();
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String dig [] = line.split(" ");
            /*ss.setFirst(Integer.parseInt(dig[0]));
            ss.setSecond(Integer.parseInt(dig[1]));*/
            ss.setFirst(Integer.parseInt(dig[1]));
            ss.setSecond(Integer.parseInt(dig[0]));
            context.write(ss, new IntWritable(Integer.parseInt(dig[1])));
        }
    }

    /**
     * reduce阶段
     * @author lyd
     *
     */
	public static class MyReducer extends Reducer<SecondarySortWritable, IntWritable, SecondarySortWritable, IntWritable> {
        private IntWritable v = new IntWritable(0);
		private static int tmp = 0;
        @Override
		protected void reduce(SecondarySortWritable key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {

            /*
            //方法一
            if(key.getFirst() != tmp){
                context.write(key,v);
                tmp = key.getFirst();
            }*/
            context.write(key,v);
		}
	}


    public void setConf(Configuration conf) {
   }

    public Configuration getConf() {
        return new Configuration();
    }

    /**
     * 驱动方法
     */
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "ssjob");
        job.setJarByClass(SortSecondaryMaxDemo.class);

        // set inputpath and outpuatpath
        setInputAndOutput(job, conf, args);

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(SecondarySortWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		//设置分组比较器
//		job.setGroupingComparatorClass(MyGroupCompartor.class);
        job.setGroupingComparatorClass(WritableComparator.class);

        //提交
        return job.waitForCompletion(true) ? 0 : 1;
    }

    //主方法
    public static void main(String[] args) throws Exception {
        int isok = ToolRunner.run(new Configuration(), new SortSecondaryMaxDemo(), args);
        System.exit(isok);
    }

    /**
     * 处理参数的方法
     * @param job
     * @param conf
     * @param args
     */
    public static void setInputAndOutput(Job job,Configuration conf,String[] args){
        if(args.length != 2){
            System.out.println("usage:yarn jar /*.jar package.classname /inputpath /outputpath");
            return ;
        }
        //正常处理输入输出参数
        try {
            FileInputFormat.addInputPath(job, new Path(args[0]));

           //FileSystem fs = FileSystem.get(conf);
            Path outputPath = new Path(args[1]);
            /*if(fs.exists(outputPath)){
                fs.delete(outputPath, true);
            }*/
            FileOutputFormat.setOutputPath(job, outputPath);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}


上一篇:[hadoop] yarn工作机制详细步骤及流程图


下一篇:九度OJ题目1162-I Wanna Go Home