开源ETL-Kettle:例程分析-transBuilder.java
网上似乎中文的开源etl相关资料比较少~~由于工作原因自己最近在接触kettle项目。目前分析了项目中给出的关键例程transBuilder.java,该例程对二次开发是很有帮助的。现在帖出自己对该例程的理解如下: 粗略分析: tabletable第一步:创建一个新的TransMeta对象,该对象用于描述整个数据抽取过程 第二步:如果数据抽取过程涉及到数据库操作,则定义并设置好相关数据库,并作为属性添加到TransMeta对象中 第三步:建立数据来源step,将数据来源step对象作为属性添加到TransMeta对象中 第四步:如果需要对所抽取数据做处理,则建立处理过程step,将该step作为属性添加到TransMeta对象中 第五步:建立一个数据流hop对象,该数据流对象源自第三步,流至第四步,用于step之间的数据传输。该数据流hop对象也需作为属性添加到TransMeta对象中 第六步:建立目标数据step。该对象描述了最终数据的去向。将该对象作为属性添加到TransMeta对象中 第七步:与第五步相似,在处理过程step与目标数据step之间建立数据流hop对象。 第八步:执行整个TransMeta对象,完成数据抽取过程 代码分析: import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import be.Const; import be.LogWriter; import be.NotePadMeta; import be.database.Database; import be.database.DatabaseMeta; import be.xception.KettleException; import be.util.EnvUtil; import be.ans.StepLoader; import be.ans.Trans; import be.ans.TransHopMeta; import be.ans.TransMeta; import be.ans.step.StepMeta; import be.ans.step.StepMetaInterface; import be.ans.step.selectvalues.SelectValuesMeta; import be.ans.step.tableinput.TableInputMeta; import be.ans.step.tableoutput.TableOutputMeta; /** * Class created to demonstrate the creation of transformations on-the-fly. * * @author Matt * */ public class TransBuilder { //这是对数据库的定义 public static final String[] databasesXML = { "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>target</name>" + "<server>localhost</server>" + "<type>MSSQL</type>" + "<access>Native</access>" + "<database>test</database>" + "<port>1433</port>" + "<username>matt</username>" + "<password>abcd</password>" + "</connection>", "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" + "<connection>" + "<name>source</name>" + "<server>localhost</server>" + "<type>MYSQL</type>" + "<access>Native</access>" + "<database>test</database>" + "<port>3306</port>" + "<username>matt</username>" + "<password>abcd</password>" + "</connection>" }; /** * Creates a new Transformation using input parameters such as the tablename to read from. * @param transformationName The name of the transformation * @param sourceDatabaseName The name of the database to read from * @param sourceTableName The name of the table to read from * @param sourceFields The field names we want to read from the source table * @param targetDatabaseName The name of the target database * @param targetTableName The name of the target table we want to write to * @param targetFields The names of the fields in the target table (same number of fields as sourceFields) * @return A new transformation * @throws KettleException In the rare case something goes wrong */ public static final TransMeta buildCopyTable(String transformationName, String sourceDatabaseName, String sourceTableName, String[] sourceFields, String targetDatabaseName, String targetTableName, String[] targetFields) throws KettleException { vironmentInit(); try { // // Create a // 创建一个transMeta对象 TransMeta transMeta = new TransMeta(); transMeta.setName(transformationName); // Add the database connections // 创建数据库对象,将数据库对象作为属性添加到transMeta对象中 for (int i=0;i<databasesXML.length;i++) { DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]); transMeta.addDatabase(databaseMeta); } DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName); DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName); // // Add a note // String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR; note += "After that, it writes the information to table [" + targetTableName + "] on database [" + targetDBInfo + "]"; NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1); transMeta.addNote(ni); // // create the // 创建一个源数据step,该setp是一个来自数据库的源 String fromstepname = "read from [" + sourceTableName + "]"; TableInputMeta tii = new TableInputMeta(); //一个表输入源 tii.setDatabaseMeta(sourceDBInfo); //该表所在的数据库 String selectSQL = "SELECT "+Const.CR; for (int i=0;i<sourceFields.length;i++) { if (i>0) selectSQL+=", "; else selectSQL+=" "; selectSQL+=sourceFields[i]+Const.CR; } selectSQL+="FROM "+sourceTableName; tii.setSQL(selectSQL); //设定取数据的sql语句 // 以下将源数据step作为属性添加到transMeta对象中 StepLoader steploader = Instance(); String fromstepid = StepPluginID(tii); StepMeta fromstep = new StepMeta(fromstepid, fromstepname, (StepMetaInterface) tii); fromstep.setLocation(150, 100); fromstep.setDraw(true); fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]"); transMeta.addStep(fromstep); // // add logic to rename fields // Use metadata logic in SelectValues, // 创建一个数据处理对象,此处为一个select对象。该对象用于选择指定的源数据的列,将其中的数据送到目标数据库的指定列 SelectValuesMeta svi = new SelectValuesMeta(); svi.allocate(0, 0, sourceFields.length); for (int i = 0; i < sourceFields.length; i++) { MetaName()[i] = sourceFields[i]; //设定源列 MetaRename()[i] = targetFields[i]; //设定目标列 } //以下将该select对象作为属性添加到transMeta对象中 String selstepname = "Rename field names"; String selstepid = StepPluginID(svi); StepMeta selstep = new StepMeta(selstepid, selstepname, (StepMetaInterface) svi); selstep.setLocation(350, 100); selstep.setDraw(true); selstep.setDescription("Rename field names"); transMeta.addStep(selstep); //创建一个数据流对象,用于连接源和select对象 //并将该数据流对象作为属性添加到transMeta对象中 TransHopMeta shi = new TransHopMeta(fromstep, selstep); transMeta.addTransHop(shi); fromstep = selstep; // // Create the // // // Add the //创建一个目标对象,此处目标为数据库输出 String tostepname = "write to [" + targetTableName + "]"; TableOutputMeta toi = new TableOutputMeta(); //创建一个表输出对象 toi.setDatabaseMeta(targetDBInfo); //设置该表所在数据库 toi.setTablename(targetTableName); //设置目标表的表名。若目标数据库中不存在该表,则创建表 toi.setCommitSize(200); toi.setTruncateTable(true); //将该目标对象作为属性添加到transMeta对象中 String tostepid = StepPluginID(toi); StepMeta tostep = new StepMeta(tostepid, tostepname, (StepMetaInterface) toi); tostep.setLocation(550, 100); tostep.setDraw(true); tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]"); transMeta.addStep(tostep); // // Add a hop between the // 建立一个数据流对象,用于连接select和目标 TransHopMeta hi = new TransHopMeta(fromstep, tostep); transMeta.addTransHop(hi); // OK, if we're still here: overwrite the return transMeta; } catch (Exception e) { throw new KettleException("An unexpected error occurred creating the new transformation", e); } } /** * 1) create a new transformation * 2) save the transformation as XML file * 3) generate the SQL for the target table * 4) Execute the transformation * 5) drop the target table to make this program repeatable * * @param args */ public static void main(String[] args) throws Exception { vironmentInit(); // Init LogWriter log = Instance("TransBuilder.log", true, LogWriter.LOG_LEVEL_DETAILED); // Load the Kettle steps & plugins StepLoader stloader = Instance(); if (!ad()) { log.logError("TransBuilder", "Error loading Kettle steps & stopping now!"); return; } // The parameters we want, optionally this can be String fileName = "l"; String transformationName = "Test Transformation"; String sourceDatabaseName = "source"; String sourceTableName = "Customer"; String sourceFields[] = { "customernr", "Name", "firstname", "lang", "sex", "street", "housnr", "bus", "zipcode", "location", "country", "date_of_birth" }; String targetDatabaseName = "target"; String targetTableName = "Cust"; String targetFields[] = { "CustNo", "LastName", "FirstName", "Lang", "gender", "Street", "Housno", "busno", "ZipCode", "City", "Country", "BirthDate" }; // Generate the transformation. TransMeta transMeta = TransBuilder.buildCopyTable( transformationName, sourceDatabaseName, sourceTableName, sourceFields, targetDatabaseName, targetTableName, targetFields ); // Save it as a file: String xml = XML(); DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName))); dos.Bytes("UTF-8")); dos.close(); System.out.println("Saved transformation to file: "+fileName); // OK, What's the SQL we need to execute to generate the target table? String sql = SQLStatementsString(); // Execute the SQL on the target table: Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName)); t(); Statements(sql); //执行transMeta对象,进行数据抽取工作 // Now execute Trans trans = new Trans(log, transMeta); ute(null); trans.waitUntilFinished(); // For testing/repeatability, we drop the target table again Statement("drop table "+targetTableName); targetDatabase.disconnect(); } } |
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论