Java脚本处理数据
⽬录
⼀、流程
在继承kettle的类之前,先去看spoon中有⼀个脚本的组件,可以使⽤java代码或者js等处理数据,
当把这个流程配置起来的时候,是如下所⽰,
当双击这个main时,会出现⼀个⽅法,这就是处理⾏数据的⽅法,
其中有参考⽰例,设置值的⽰例,如下图所⽰,
当运⾏后结果如下图,确实,通过Java代码处理了数据,
⼆、代码
在代码中可以定义Java节点,⽤来执⾏对应的代码,⽽代码就是界⾯⼯具中的processRow⽅法,那么也就是说可以通过processRow这个⽅法来处理数据。
/**
* 获取java 脚本
* @param transMeta
* @param registry
* @return
*/
private StepMeta getJavaStep(TransMeta transMeta, PluginRegistry registry){
UserDefinedJavaClassMeta javaClassMeta = new UserDefinedJavaClassMeta();
//Java代码
String sourceCode = "public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {\n" +
"  if (first) {\n" +
"    first = false;\n" +
"\n" +
"    /* TODO: Your code here. (Using info fields)\n" +
"\n" +
"    FieldHelper infoField = get(Fields.Info, \"info_field_name\");\n" +
"\n" +
"    RowSet infoStream = findInfoRowSet(\"info_stream_tag\");\n" +
"\n" +
"    Object[] infoRow = null;\n" +
"\n" +
"    int infoRowCount = 0;\n" +
"\n" +
"    // Read all rows from info step before calling getRow() method, which returns first row from any\n" +
"    // input rowset. As rowMeta for info and input steps varies getRow() can lead to errors.\n" +
"    while((infoRow = getRowFrom(infoStream)) != null){\n" +
"\n" +
"      // do something with info data\n" +
"      infoRowCount++;\n" +
"    }\n" +
"    */\n" +
"    */\n" +
"  }\n" +
"\n" +
"  Object[] r = getRow();\n" +
"\n" +
"  if (r == null) {\n" +
"    setOutputDone();\n" +
"    return false;\n" +
"  }\n" +
"\n" +
"  // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large\n" +
"  // enough to handle any new fields you are creating in this step.\n" +
"  r = createOutputRow(r, data.outputRowMeta.size());\n" +
"\n" +
"  /* TODO: Your code here. (See Sample)\n" +
"\n" +
"  // Get the value from an input field\n" +
"  String foobar = get(Fields.In, \"a_fieldname\").getString(r);\n" +
"\n" +
"  foobar += \"bar\";\n" +
"    \n" +
"  // Set a value in a new output field\n" +
"  get(Fields.Out, \"output_fieldname\").setValue(r, foobar);\n" +
"\n" +
"  */\n" +
"\tString name = get(Fields.In,\"name\").getString(r);\n" +
"\tif(null!=name){\n" +
"\t\tname = name+\"_new\";\n" +
"\t}\n" +
"\tget(Fields.Out,\"new_name\").setValue(r,name);\n" +
"\n" +
"  // Send the row on to the next step.\n" +
"  putRow(data.outputRowMeta, r);\n" +
"\n" +
"  return true;\n" +
"}";
java replace方法UserDefinedJavaClassDef classDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS,"Processor",sourceCode)
List<UserDefinedJavaClassDef> classDefs = new ArrayList<>();
classDefs.add(classDef);
//添加Java脚本到节点中
List<UserDefinedJavaClassMeta.FieldInfo> fields = new ArrayList<>();
//定义⽬标输出字段
UserDefinedJavaClassMeta.FieldInfo fieldInfo =
new UserDefinedJavaClassMeta.FieldInfo("new_name",ValueMetaInterface.TYPE_STRING,-1,-1);
fields.add(fieldInfo);
javaClassMeta.setFieldInfo(fields);
String javaClassPluginId = PluginId(StepPluginType.class, javaClassMeta);
StepMeta javaClassStep = new StepMeta(javaClassPluginId, "Java 代码", (StepMetaInterface) javaClassMeta);
javaClassStep.setDraw(true);
javaClassStep.setLocation(560,304);
transMeta.addStep(javaClassStep);
return javaClassStep;
}
⾸先以TableInput和TableOutput这两个kettle中常⽤组件来说。
打开这两个的源码,发现都有processRow这个⽅法,那么也就是说表输⼊和表输出的数据处理都可以在此进⾏,        那么是否可以继承TableInput和TableOutput,并重写processRow来定义⾃⼰的处理⽅式呢?
TableInput
public class TableInput extends BaseStep implements StepInterface {
private TableInputMeta meta;
private TableInputData data;
public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
//表查询
boolean success = doQuery( parametersMeta, parameters );
//设置数据
putRow( wMeta, data.thisrow );
}
private boolean doQuery( RowMetaInterface parametersMeta, Object[] parameters ) throws KettleDatabaseException {
}
}
TableOutput
public class TableOutput extends BaseStep implements StepInterface {
private TableOutputMeta meta;
private TableOutputData data;
public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
meta = (TableOutputMeta) smi;
data = (TableOutputData) sdi;
//获取数据
Object[] r = getRow();
try {
//写数据到表
Object[] outputRowData = writeToTable( getInputRowMeta(), r );
if ( outputRowData != null ) {
putRow( data.outputRowMeta, outputRowData ); // in case we want it
}
} catch ( KettleException e ) {
}
}
}
后⾯就以这两个为例,来写⾃⼰的处理⽅式。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。