KETTLE实现数据的删除和更新
⼀、实现⽬标
  源数据库的数据更新或者删除之后,⽬标数据库的数据跟着更新或删除,整体流程截图如下:
⼀、准备⼯作
源数据库ORACLE  ⽬标数据库MongoDB,在源数据库添加删除、更新触发器
⼆、操作步骤
1. 添加表输⼊组件,连接ORACLE触发器记录表
2. 添加JAVA代码组件,进⾏步骤跳转,根据输⼊的数据判断是删除或者更新,如果是删除,则跳转⾄MongoDB Delete步骤中,如果是更新的话,跳转⾄字段选择步骤
中。JAVA代码中的详细信息如下:
3. import java.util.List;
import org.xception.KettleException;
import org.ow.RowDataUtil;
import org.ow.RowMeta;
import org.ow.RowMetaInterface;
import org.ow.ValueMeta;
import org.ans.Trans;
import org.ans.TransMeta;
private Object[] previousRow;//上⼀⾏
private RowSet t1 = null;//业务表步骤
private RowSet t2 = null;//删除步骤
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
Object[] r = getRow(); //获取输⼊⾏
if ( first ) {
if ( getInputRowMeta() == null ) {
setOutputDone();//设置输出完成
return false;
}
}
if ( r == null ) { // 如果当前⾏为null
if ( previousRow != null ) {//如果上⼀⾏不为null
//是最后⼀⾏
boolean valid=true;
previousRow = createOutputRow(previousRow, data.outputRowMeta.size());
Trans trans=getTrans();//获取转换实例
if (trans != null){
String sync_val = get(Fields.In, "ID").getString(previousRow);//获取ID
trans.setVariable("LAST_SYNC_VAL", sync_val);//设置变量的值
}
String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//获取操作类型是删除还是更新
String keyid= get(Fields.In, "DATAID").getString(previousRow);//获取操作类型是删除还是更新
//Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
//get(Fields.Out, "KEYID").setValue(rowData,keyid);
//putRowTo(data.outputRowMeta, previousRow,t2);
if(OpType.equals("UPDATE")){//验证通过
putRowTo(data.outputRowMeta, previousRow,t1);
}
else
{
putRowTo(data.outputRowMeta, previousRow,t2);
}
}
setOutputDone();//设置输出完成
return false;//返回false表⽰不⽤再继续处理processRow
return false;//返回false表⽰不⽤再继续处理processRow
}
if ( !first ) {//不是第⼀次执⾏,因为第⼀次执⾏时previousRow⼀定是Null
//不是最后⼀⾏
boolean valid=true;
String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//获取操作类型是删除还是更新
String keyid= get(Fields.In, "DATAID").getString(previousRow);//获取操作类型是删除还是更新
//Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
//get(Fields.Out, "KEYID").setValue(rowData,keyid);
//putRowTo(data.outputRowMeta, previousRow,t2);
if(OpType.equals("UPDATE")){
putRowTo(data.outputRowMeta, previousRow,t1);
}
else
{
putRowTo(data.outputRowMeta, previousRow,t2);
}null官方更新地址
}
previousRow = r;//把当前⾏设为下⼀次执⾏的上⼀⾏
if ( first ) {//如果是⾸次执⾏
first = false;
t1 = findTargetRowSet("dataupdate");//业务表步骤
t2 = findTargetRowSet("datadelete");//数据删除步骤
}
return true;//返回true表⽰还要继续处理processRow
}
3.如果跳转⾄了MongoDB Delete,则根据ID对⽬标库进⾏删除。Mongodb delete组件配置如下:
  JSON query中的{ID:"?{DATAID}"}表⽰删除ID等于传进来的参数DATAID的所有数据,Execute for each row要选择上,表⽰执⾏每⼀⾏数据。
  4.如果通过JAVA代码2判断为更新的话,则流程将跳转⾄字段选择组件,只获取主键ID,此步骤⾮常重要,因为要根据ID去源表中获取等更新的那条数据。
5.选择表输⼊组件,该步骤是根据上⼀步传⼊的ID获取待更新的那⼀条数据
PS:获取SQL查询语句:此处写⼊SQL语句,⾥边的?是变量替换,下边要勾选上"替换SQL语句⾥的变量",从步骤插⼊数据要选择上⼀步,勾选上执⾏每⼀⾏。
6.下边的步骤:流查询、JAVA代码是对数据进⾏清洗,字典替换,此处不再解释
7.最后⼀步:Mongodb output输出需要详细设置
output options选项卡勾选update  modifier update
Mongo⽂档字段配置:ID为主键匹配字段,匹配字段更新为Y 修改器设置为N/A表⽰不对主键更新

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。