Hive-编写UDTF函数一进多出(详细教程~~~)

创建项目的话,和之前写UDF函数的流程是一样的,如果不懂的,看这篇文章:
HIVE-编写UDF函数

在包udf中再创建一个MyUDTF类,继承UDTF函数,实现接口:

package com.atguigu.udf;

import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

public class MyUDTF extends GenericUDTF {
    public void process(Object[] args) throws HiveException {
        
    }

    public void close() throws HiveException {

    }
}

按提示所实现的方法有两个,其实不够,还要把初始化方法给他加上。

package com.atguigu.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

/**
 * 输入数据,hello,atguigu,hive
 * 输出数据:
 *  hello
 *  atguigu
 *  hive
 */
public class MyUDTF extends GenericUDTF {
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        return super.initialize(argOIs);
    }

    //处理输入数据
    public void process(Object[] args) throws HiveException {

    }
    //收尾方法
    public void close() throws HiveException {

    }
}

完成的业务代码:

package com.atguigu.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

import java.util.ArrayList;
import java.util.List;

/**
 * 输入数据,hello,atguigu,hive
 * 输出数据:
 *  hello
 *  atguigu
 *  hive
 */
public class MyUDTF extends GenericUDTF {

    //输出数据的集合
    private  ArrayList<String> outPutList= new ArrayList<String>();

    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //输出数据的默认别名,可以被别名覆盖
        List<String> fieldNames=new ArrayList<String>();
        fieldNames.add("word");

        //输出数据的类型
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

        //最终返回值
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    //处理输入数据:hello,atguigu,hive
    public void process(Object[] args) throws HiveException {
        //1.取出输入数据
        String input=args[0].toString();

        //2.按照","分割字符串
        String[] words = input.split(",");

        //3.遍历数据写出
        for (String word : words) {
            //清空集合
            outPutList.clear();

            //将数据放入集合
            outPutList.add(word);

            //输出数据
            forward(outPutList);
        }
    }
    //收尾方法
    public void close() throws HiveException {

    }
}

进行打包,重新拖入到hive的lib目录下:
添加到类路径:

hive (default)> add jar /opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar
              > ;
Added [/opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar] to class path
Added resources: [/opt/module/hive/lib/hive-demo-1.0-SNAPSHOT.jar]

创建函数:

hive (default)> create temporary function myudtf as "com.atguigu.udf.MyUDTF"; 
OK
Time taken: 0.733 seconds

注意:如果出现问题的话,怎么试都没法成功,就去重新启动hive
input表的数据:

hello,spark
hello,hive
hello,zhoujielun,linjunjie,dengziqi
hello,hadoop,mapreduce,yarn,common

创建表:

hive (default)>  create table input(words string) ;

加载数据进去:

load data local inpath '/opt/module/datas/input.txt'
into table input;

查看数据:

hive (default)> select * from input;
OK
input.words
hello,spark
hello,hive
hello,zhoujielun,linjunjie,dengziqi
hello,hadoop,mapreduce,yarn,common
Time taken: 1.386 seconds, Fetched: 4 row(s)

使用UDTF函数:

hive (default)> select my_udtf(words) from input;
OK
word
hello
spark
hello
hive
hello
zhoujielun
linjunjie
dengziqi
hello
hadoop
mapreduce
yarn
common
Time taken: 3.788 seconds, Fetched: 13 row(s)

实现了分割逗号,一进多出的效果。

上一篇:pandas介绍


下一篇:hive 窗口函数简介(udf\udaf\udtf)