ODPS与Kettle融合之道

场景介绍

传统的数据治理厂商(没有采用云计算技术),往往使用 Oracle 作为数仓的存储,使用 Kettle 作为ETL和流程调度工具。依托于 Oracle 的稳定、高效,以及 Kettle 的灵活,传统架构可以胜任各种复杂的场景。他们数据治理的架构简单来说是这样的:

ODPS与Kettle融合之道

随着云计算技术的不断发展和推广,传统架构正在慢慢淡出市场,但是交付过程中,不可避免的会遇到把云计算和传统厂商相集成的场景。例如,我们使用了阿里云的产品DataWorks,它集成了云数仓ODPS和离线同步工具DataX,整体架构会变成这样:

ODPS与Kettle融合之道

本文主要介绍的是,在上述的场景下,如何保证 DataX 作业和 Kettle 作业的同步问题。

方案介绍

1、在DataWorks中创建一个虚拟节点(vn_root),并把该节点设置为“暂停”状态(暂停状态的实例,会在设定的执行时间到达时,转换为“运行失败”状态),然后把所有DataX数据集成任务的上游配置成该节点;

2、使用 Java 封装 DataWorks API,封装类名为ResumeTask,并输出为 jar 包(active_vn_root.jar),代码中实现以下流程用来激活DataX作业:

ODPS与Kettle融合之道

3、把active_vn_root.jar放入 data-integration\lib 目录后重启 Kettle

4、在原业务流程中增加一个 java 代码节点,源代码如下:

/* 引用 jar 包中的方法 */
import dataworks.ResumeTask;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {

    Object[] r = getRow();
    if (r == null) {
        setOutputDone();
        return false;
    }

    /* 只处理第一行 */
    if (first) {
        first = false;
    } else {
        setOutputDone();
        return false;
    }

    Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());

    /* 捕获上游信号 */
    String signal = get(Fields.In, "signal").getString(r);

    /* 处理信号 */
    if (signal.compareTo("1") == 0) {
        logBasic("signal is 1");
        /* 节点号 1234 是固定值,不会变*/
        int ret = ResumeTask.doResumeTask(1234);
        /** ret 返回数字的含义
         *         0 成功;
         *         -1 其他异常(网络异常);
         *         1 查询异常,未查询到相关实例;
         *         2 查询异常,查询到多个相关实例;
         *         3 恢复失败;
         *         4 恢复失败,任务为一次性任务;
         *         5 恢复失败,任务为空跑任务;
         *         6 恢复失败,接口调用返回值异常;
         *         7 实例恢复成功,但重跑失败;
         *         8 实例恢复成功,但重跑失败,任务并非失败任务,不需要重跑
         *         9 实例恢复成功,但重跑失败,接口调用返回值异常;
         */
        logBasic("return code is " + ret);
        if (ret == 0) {
            logBasic("active root node successfully: 调用成功");
            get(Fields.Out, "result").setValue(outputRow, "success");
        } else {
            String err_msg = "UNKOWN ERROR";
            switch(ret) {
                case -1: err_msg="其他异常(网络异常);";break;
                case  1: err_msg="查询异常,未查询到相关实例;";break;
                case  2: err_msg="查询异常,查询到多个相关实例;";break;
                case  3: err_msg="恢复失败;";break;
                case  4: err_msg="恢复失败,任务为一次性任务;";break;
                case  5: err_msg="恢复失败,任务为空跑任务;";break;
                case  6: err_msg="恢复失败,接口调用返回值异常;";break;
                case  7: err_msg="实例恢复成功,但重跑失败;";break;
                case  8: err_msg="实例恢复成功,但重跑失败,任务并非失败任务,不需要重跑;";break;
                case  9: err_msg="实例恢复成功,但重跑失败,接口调用返回值异常;";break;
            }
            logBasic("active root node failed: 调用失败," + err_msg);
            get(Fields.Out, "result").setValue(outputRow, "fail");
        }
    } else {
        logBasic("signal is not 1, will do nothing:  接收到的信号错误");
        get(Fields.Out, "result").setValue(outputRow, "fail");
    }
    putRow(data.outputRowMeta, outputRow);
    setOutputDone(); 
    return false;
}

5、由于DataWorks API属于异步操作(也就是说,调用后即返回,不会等待任务执行完毕),所以不用担心该节点的执行会阻塞整体流程的执行。

上一篇:便捷的多人协作开发项目工具--apipost


下一篇:MaxCompute