odps UDAF解析

UDAF
class odps.udf.BaseUDAF
继承此类实现Python UDAF。

BaseUDAF.new_buffer()
实现此方法返回聚合函数的中间值的buffer。buffer必须是mutable object(比如list, dict),并且buffer的大小不应该随数据量递增,在极限情况下,buffer marshal过后的大小不应该超过2Mb。

BaseUDAF.iterate(buffer[, args, ...])
实现此方法将args聚合到中间值buffer中。

BaseUDAF.merge(buffer, pbuffer)
实现此方法将两个中间值buffer聚合到一起,即将pbuffer merge到buffer中。

BaseUDAF.terminate(buffer)
实现此方法将中间值buffer转换为ODPS SQL基本类型。下面是一个UDAF求平均值的例子。

from odps.udf import annotate
from odps.udf import BaseUDAF

@annotate('bigint->bigint')
class Average(BaseUDAF):

def new_buffer(self):
    return [0, 0]

def iterate(self, buffer, number):
    if number is not None:
        buffer[0] += number
        buffer[1] += 1

def merge(self, buffer, pbuffer):
    buffer[0] += pbuffer[0]
    buffer[1] += pbuffer[1]

def terminate(self, buffer):
    if buffer[1] == 0:
        return 0
    return buffer[0] / buffer[1]

比如计算1,2,3,4的平均值的执行过程如下图所示:
odps UDAF解析

上一篇:Ajax中解析Json的两种方法详解


下一篇:appendString2Txt.py