datax 新增数据插件流程

1、自定Transformer

先在 pom.xmlpackage.xml 中保留所需的模块及 transfomer 模块,在 datax-transformer 模块中自定义所需的类并实现
transformer 接口,具体内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class MysqlTransFormerTest extends Transformer {

public MysqlTransFormerTest(){
setTransformerName("dx_test_transfomer");
}
//这里的record就是读入的每一条数据
//paras是传入的参数,就是json配置中的parameter参数
@Override
public Record evaluate(Record record, Object... paras) {
//有几列
System.out.println(record.getColumnNumber());
//指定列的数据(占用大小,数据内容,数据类型)--{"byteSize":2,"rawData":"aa","type":5}
System.out.println(record.getColumn(0));

//给第0列数字+1并写入
Long aLong = record.getColumn(0).asLong();
long l = aLong.longValue();
record.setColumn(0,new LongColumn(l+1));
return record;
}
}

2、注册自定义Transformer

datax 会在初始化时加载 TransformerRegistry 并将指定的 Transformer 进行注册。要将自定义的 Transformer 加入其注册的静态代码块中。

在 datax-core 模块的 transport-transformer-TransformerRegistry 中找到 static 代码块,加入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static {
/**
* add native transformer
* local storage and from server will be delay load.
*/

registTransformer(new SubstrTransformer());
registTransformer(new PadTransformer());
registTransformer(new ReplaceTransformer());
registTransformer(new FilterTransformer());
registTransformer(new GroovyTransformer());
registTransformer(new DigestTransformer());
registTransformer(new MysqlTransFormerTest());
}

3、json 文件配置 transformer

注意:一定要配置 columnIndex,可以不用,但是在代码中是必须的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "xxxxxx",
"column": [
"id",
"name"
],
"connection": [
{
"table": [
"tt1"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/test"
]
}
]
}
},
"transformer": [
{
"name": "dx_test_transfomer",
"parameter": {
"columnIndex": 1
}
}
],
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "xxxxxx",
"column": [
"id",
"name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from tt2"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=gbk",
"table": [
"tt2"
]
}
]
}
}
}
]
}
}

datax 新增数据插件流程
https://flepeng.github.io/045-DataX-datax-新增数据插件流程/
作者
Lepeng
发布于
2024年3月6日
许可协议