开源ETL-Kettle:例程分析-transBuilder.java
网上似乎中文的开源etl相关资料比较少~~由于工作原因自己最近在接触kettle项目。目前分析了项目中给出的关键例程transBuilder.java,该例程对二次开发是很有帮助的。现在帖出自己对该例程的理解如下:
粗略分析:
tabletable
第一步:创建一个新的TransMeta对象,该对象用于描述整个数据抽取过程
第二步:如果数据抽取过程涉及到数据库操作,则定义并设置好相关数据库,并作为属性添加到TransMeta对象中
第三步:建立数据来源step,将数据来源step对象作为属性添加到TransMeta对象中
第四步:如果需要对所抽取数据做处理,则建立处理过程step,将该step作为属性添加到TransMeta对象中
第五步:建立一个数据流hop对象,该数据流对象源自第三步,流至第四步,用于step之间的数据传输。该数据流hop对象也需作为属性添加到TransMeta对象中
第六步:建立目标数据step。该对象描述了最终数据的去向。将该对象作为属性添加到TransMeta对象中
第七步:与第五步相似,在处理过程step与目标数据step之间建立数据流hop对象。
第八步:执行整个TransMeta对象,完成数据抽取过程
代码分析:

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import be.Const;
import be.LogWriter;
import be.NotePadMeta;
import be.database.Database;
import be.database.DatabaseMeta;
import be.xception.KettleException;
import be.util.EnvUtil;
import be.ans.StepLoader;
import be.ans.Trans;
import be.ans.TransHopMeta;
import be.ans.TransMeta;
import be.ans.step.StepMeta;
import be.ans.step.StepMetaInterface;
import be.ans.step.selectvalues.SelectValuesMeta;
import be.ans.step.tableinput.TableInputMeta;
import be.ans.step.tableoutput.TableOutputMeta;
/**
* Class created to demonstrate the creation of transformations on-the-fly.
*
* @author Matt
*
*/
public class TransBuilder
{
    //这是对数据库的定义
    public static final String[] databasesXML = {
        "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
        "<connection>" +
            "<name>target</name>" +
            "<server>localhost</server>" +
            "<type>MSSQL</type>" +
            "<access>Native</access>" +
            "<database>test</database>" +
            "<port>1433</port>" +
            "<username>matt</username>" +
            "<password>abcd</password>" +
          "</connection>",
         
          "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
          "<connection>" +
              "<name>source</name>" +
              "<server>localhost</server>" +
              "<type>MYSQL</type>" +
              "<access>Native</access>" +
              "<database>test</database>" +
              "<port>3306</port>" +
              "<username>matt</username>" +
              "<password>abcd</password>" +
            "</connection>"
    };
    /**
    * Creates a new Transformation using input parameters such as the tablename to read from.
    * @param transformationName The name of the transformation
    * @param sourceDatabaseName The name of the database to read from
    * @param sourceTableName The name of the table to read from
    * @param sourceFields The field names we want to read from the source table
    * @param targetDatabaseName The name of the target database
    * @param targetTableName The name of the target table we want to write to
    * @param targetFields The names of the fields in the target table (same number of fields as sourceFields)
    * @return A new transformation
    * @throws KettleException In the rare case something goes wrong
    */
    public static final TransMeta buildCopyTable(String transformationName, String sourceDatabaseName, String sourceTableName, String[] sourceFields, String targetDatabaseName, String targetTableName, String[] targetFields) throws KettleException
    {
        vironmentInit();
        try
        {
            //
            // Create a
            // 创建一个transMeta对象
            TransMeta transMeta = new TransMeta();
            transMeta.setName(transformationName);
           
            // Add the database connections
            // 创建数据库对象,将数据库对象作为属性添加到transMeta对象中
            for (int i=0;i<databasesXML.length;i++)
            {
                DatabaseMeta databaseMeta = new DatabaseMeta(databasesXML[i]);
                transMeta.addDatabase(databaseMeta);
            }
           
            DatabaseMeta sourceDBInfo = transMeta.findDatabase(sourceDatabaseName);
            DatabaseMeta targetDBInfo = transMeta.findDatabase(targetDatabaseName);
           
            //
            // Add a note
            //
            String note = "Reads information from table [" + sourceTableName+ "] on database [" + sourceDBInfo + "]" + Const.CR;
            note += "After that, it writes the information to table [" + targetTableName + "] on database [" + targetDBInfo + "]";
            NotePadMeta ni = new NotePadMeta(note, 150, 10, -1, -1);
            transMeta.addNote(ni);
            //
            // create the
            // 创建一个源数据step,该setp是一个来自数据库的源
            String fromstepname = "read from [" + sourceTableName + "]";
            TableInputMeta tii = new TableInputMeta();          //一个表输入源
            tii.setDatabaseMeta(sourceDBInfo);                    //该表所在的数据库
            String selectSQL = "SELECT "+Const.CR;
            for (int i=0;i<sourceFields.length;i++)
            {
                if (i>0) selectSQL+=", "; else selectSQL+=" ";
                selectSQL+=sourceFields[i]+Const.CR;
            }
            selectSQL+="FROM "+sourceTableName;
            tii.setSQL(selectSQL);                                        //设定取数据的sql语句
            // 以下将源数据step作为属性添加到transMeta对象中
            StepLoader steploader = Instance();
            String fromstepid = StepPluginID(tii);
            StepMeta fromstep = new StepMeta(fromstepid, fromstepname, (StepMetaInterface) tii);
            fromstep.setLocation(150, 100);
            fromstep.setDraw(true);
            fromstep.setDescription("Reads information from table [" + sourceTableName + "] on database [" + sourceDBInfo + "]");
            transMeta.addStep(fromstep);
            //
            // add logic to rename fields
            // Use metadata logic in SelectValues,
            // 创建一个数据处理对象,此处为一个select对象。该对象用于选择指定的源数据的列,将其中的数据送到目标数据库的指定列
            SelectValuesMeta svi = new SelectValuesMeta();
            svi.allocate(0, 0, sourceFields.length);
            for (int i = 0; i < sourceFields.length; i++)
            {
                MetaName()[i] = sourceFields[i];                      //设定源列
                MetaRename()[i] = targetFields[i];                    //设定目标列
            }
            //以下将该select对象作为属性添加到transMeta对象中
            String selstepname = "Rename field names";
            String selstepid = StepPluginID(svi);
            StepMeta selstep = new StepMeta(selstepid, selstepname, (StepMetaInterface) svi);
            selstep.setLocation(350, 100);
            selstep.setDraw(true);
            selstep.setDescription("Rename field names");
            transMeta.addStep(selstep);
            //创建一个数据流对象,用于连接源和select对象
            //并将该数据流对象作为属性添加到transMeta对象中
            TransHopMeta shi = new TransHopMeta(fromstep, selstep);
            transMeta.addTransHop(shi);
            fromstep = selstep;
            //
            // Create the
            //
            //
            // Add the
            //创建一个目标对象,此处目标为数据库输出
            String tostepname = "write to [" + targetTableName + "]";
            TableOutputMeta toi = new TableOutputMeta();                  //创建一个表输出对象
            toi.setDatabaseMeta(targetDBInfo);                                      //设置该表所在数据库
            toi.setTablename(targetTableName);                                    //设置目标表的表名。若目标数据库中不存在该表,则创建表
            toi.setCommitSize(200);
            toi.setTruncateTable(true);
          //将该目标对象作为属性添加到transMeta对象中
            String tostepid = StepPluginID(toi);
            StepMeta tostep = new StepMeta(tostepid, tostepname, (StepMetaInterface) toi);
            tostep.setLocation(550, 100);
            tostep.setDraw(true);
            tostep.setDescription("Write information to table [" + targetTableName + "] on database [" + targetDBInfo + "]");
            transMeta.addStep(tostep);
            //
            // Add a hop between the
            // 建立一个数据流对象,用于连接select和目标
            TransHopMeta hi = new TransHopMeta(fromstep, tostep);
            transMeta.addTransHop(hi);
            // OK, if we're still here: overwrite the
            return transMeta;
        }
        catch (Exception e)
        {
            throw new KettleException("An unexpected error occurred creating the new transformation", e);
        }
    }
    /**
    * 1) create a new transformation
    * 2) save the transformation as XML file
    * 3) generate the SQL for the target table
    * 4) Execute the transformation
    * 5) drop the target table to make this program repeatable
    *
    * @param args
    */
    public static void main(String[] args) throws Exception
    {
    vironmentInit();
        // Init
        LogWriter log = Instance("TransBuilder.log", true, LogWriter.LOG_LEVEL_DETAILED);
       
        // Load the Kettle steps & plugins
        StepLoader stloader = Instance();
        if (!ad())
        {
            log.logError("TransBuilder", "Error loading Kettle steps & stopping now!");
            return;
        }
       
        // The parameters we want, optionally this can be
        String fileName = "l";
        String transformationName = "Test Transformation";
        String sourceDatabaseName = "source";
        String sourceTableName = "Customer";
        String sourceFields[] = {
                "customernr",
                "Name",
                "firstname",
                "lang",
                "sex",
                "street",
                "housnr",
                "bus",
                "zipcode",
                "location",
                "country",
                "date_of_birth"
            };
        String targetDatabaseName = "target";
        String targetTableName = "Cust";
        String targetFields[] = {
                "CustNo",
                "LastName",
                "FirstName",
                "Lang",
                "gender",
                "Street",
                "Housno",
                "busno",
                "ZipCode",
                "City",
                "Country",
                "BirthDate"
            };
       
        // Generate the transformation.
        TransMeta transMeta = TransBuilder.buildCopyTable(
                transformationName,
                sourceDatabaseName,
                sourceTableName,
                sourceFields,
                targetDatabaseName,
                targetTableName,
                targetFields
                );
       
        // Save it as a file:
        String xml = XML();
        DataOutputStream dos = new DataOutputStream(new FileOutputStream(new File(fileName)));
        dos.Bytes("UTF-8"));
        dos.close();
        System.out.println("Saved transformation to file: "+fileName);
        // OK, What's the SQL we need to execute to generate the target table?
        String sql = SQLStatementsString();
       
        // Execute the SQL on the target table:
        Database targetDatabase = new Database(transMeta.findDatabase(targetDatabaseName));
        t();
        Statements(sql);                  //执行transMeta对象,进行数据抽取工作
       
        // Now execute
        Trans trans = new Trans(log, transMeta);
        ute(null);
        trans.waitUntilFinished();
       
        // For testing/repeatability, we drop the target table again
        Statement("drop table "+targetTableName);
        targetDatabase.disconnect();
    }

}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。