datax(21):编写自己的Transformer

前面2篇文章,已经看完+学习完transform的内容,今天继续编写一个自己的transformer;


一、环境

  1. win10
  2. DataX 3.0(从我的datax分支打包而来)
  3. job.json使用datax的样例json,源文件在xxx\DataX\core\src\main\job\中,打包编译后在xxx\DataX\target\datax\datax\job下。本文略做修改,主要修改2出,是否打印和记录行数
{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            },
            "errorLimit": {
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 59880504, 
                                "type": "long"
                            },
                            {
                                "value": "2010-05-04 00:00:00",
                                "type": "string" // 原来是date
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "test",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 1 // 原来是100000,修改为1行,看到效果就行
                    }
                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true, //原来是false,不打印,为了在控制台看到效果,修改为true
                        "encoding": "UTF-8"
                    }
                }
            }
        ]
    }
}

二、效果

先看下不加transformer时候的运行效果
datax(21):编写自己的Transformer

加上自定义的DateTransformer(主要实现时间类型字符串格式转换)配置后运行效果:
datax(21):编写自己的Transformer


三、实现步骤

  1. 写一个类DateTransformer继承Transformer;
  2. 里面构造方法设置一个唯一标识符,用来代表本transformer;
  3. 重写evaluate()方法,根据自己实现情况定义paras参数个数及含义
  4. 将DateTransformer注册到TransformerRegistry中;

四、具体代码

编写DateTransformer类,写构造方法,写evaluate方法

package com.alibaba.datax.core.transport.transformer;

import static com.alibaba.datax.core.transport.transformer.TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER;
import static com.alibaba.datax.core.transport.transformer.TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION;

import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.transformer.Transformer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

/**
 * @author water
 * @desc 自己定义一个时间类型转换的类
 */
public class DateTransformer extends Transformer {

  /**
   * 通过设置name,标注出唯一的标识符,代表被类
   */
  public DateTransformer() {
    setTransformerName("dx_date");
  }


  /**
   * @param record Record
   * @param paras  Object  第一个参数是 colIndex。 第二个参数是原时间格式,第三个是目标时间格式
   * @return Record
   */
  @Override
  public Record evaluate(Record record, Object... paras) {
    int colIndex;
    String oldPattern;
    String newPattern;
    try {
      if (paras.length != 3) {
        throw new RuntimeException("dx_date paras must be 3");
      }
      colIndex = (Integer) paras[0];
      oldPattern = (String) paras[1];
      newPattern = (String) paras[2];
    } catch (Exception e) {
      throw DataXException.asDataXException(TRANSFORMER_ILLEGAL_PARAMETER,
          "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
    }
    Column column = record.getColumn(colIndex);
    try {
      String oriValue = column.asString();
      //如果字段为空,跳过处理
      if (oriValue == null) {
        return record;
      }
      SimpleDateFormat oldSdf = new SimpleDateFormat(oldPattern);
      Date date = oldSdf.parse(oriValue);
      SimpleDateFormat newSdf = new SimpleDateFormat(newPattern);
      String newValue = newSdf.format(date);
      record.setColumn(colIndex, new StringColumn(newValue));
    } catch (Exception e) {
      throw DataXException.asDataXException(TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
    }
    return record;
  }
}

将DateTransformer注册到TransformerRegistry中

TransformerRegistry类中
  static {
    /**
     * add native transformer
     * local storage and from server will be delay load.
     * 官方默认注册了 5 个方法,分别是截取字符串、填补、替换、过滤、groovy 代码段(后面会详细介绍)
     */

    registTransformer(new SubstrTransformer());
    registTransformer(new PadTransformer());
    registTransformer(new ReplaceTransformer());
    registTransformer(new FilterTransformer());
    registTransformer(new GroovyTransformer());
    //将自己写的transformer注册进来
    registTransformer(new DateTransformer());
  }

so easy,哪里需要写哪里~


注:

  1. 对源码进行略微改动,主要修改为 1 阿里代码规约扫描出来的,2 clean code;

  2. 所有代码都已经上传到github(master分支和dev),可以免费白嫖

上一篇:开源 ETL 工具 DataX 实践,从mysql到mysql的全量同步和批量更新


下一篇:使用DataX实现mysql数据迁移