ETL工具——kettle使用说明
1 简介
ETLExtract-Transform-Load的缩写,即数据抽取、转换、装载的过程),Kettle是一款国外开源的etl工具,纯java编写,数据抽取高效稳定。
2 运行环境:
OSWindowLinuxUnix均可
Jdk1.4以上
3 开始使用:
Kettle可以在/网站下载。下载kettle压缩包,因kettle为绿软件,解压缩到任意本地路径即可。
(本文着重介绍kettle3.2.0稳定版
Spoon 是一个图形用户界面,在不同平台上运行Spoon需要不同的脚本:
Spoon.bat: windows 平台运行Spoon(或直接点击)。
Spoon.sh: LinuxApple OSXSolaris 平台运行Spoon
登陆一般选择没有资源库:
Kettle中有两种脚本文件,transformationjobtransformation完成针对数据的基础转换,job则完成整个工作流的控制。
4 转换(Transformation
新建一个转换
kettle默认transformation文件保存后后缀名为ktr
新建数据库连接
(此链接也可在用到的节点处配置)
填写数据源配置内容
点击Test测试连接成功:
核心对象
切换到核心对象,菜单列出的是Transformation中可以调用的环节列表,可以通过鼠标拖动的方式对环节进行添加。并且可通过shift+鼠标拖动,实现环节之间的连接。
常用节点介绍(红节点后面逐一演示)
类别
环节名称
功能说明
输入
文本文件输入
从本地文本文件输入数据
表输入
从数据库表中输入数据
获取系统信息
读取系统信息输入数据
输出
文本文件输出
将处理结果输出到文本文件
表输出
将处理结果输出到数据库表
插入/更新
根据处理结果对数据库表机型插入更新,如果数据库中不存在相关记录则插入,否则为更新。会根据查询条件中字段进行判断
更新
根据处理结果对数据库进行更新,若需要更新的数据在数据库表中无记录,则会报错停止
删除
根据处理结果对数据库记录进行删除,若需要删除的数据在数据库表中无记录,则会报错停止
查询
数据库查询
根据设定的查询条件,对目标表进行查询,返回需要的结果字段
流查询
将目标表读取到内存,通过查询条件对内存中数据集进行查询
调用DB存储过程
调用数据库存储过程
转换
字段选择
选择需要的字段,过滤掉不要的字段,也可做数据库字段对应
过滤记录
根据条件对记录进行分类
排序记录
将数据根据某以条件,进行排序
空操作
无操作
增加常量
增加需要的常量字段
脚本
Modified Java Script Value
扩展功能,编写JavaScript脚本,对数据进行相应处理
映射
映射(子转换)
数据映射
作业
Sat Variables
设置环境变量
Get Variables
获取环境变量
表输入
双击拖动到工作面板上的表输入结点,选择(或者新建)所需要的数据库连接,点击获取SQL查询语句或自行编辑SQL
若需根据前一步获取数据进行查询,可用“?”号代替,变量顺序与前一节点相同
字段选择
界面如下:
选择和修改:指定需要流到输出流中的字段的精确顺序和名称
删除:指定从输出流中删除的字段(以后输出流将不会获取到此数据)
元数据:修改元数据字段的名称、类型、长度和精度
插入/更新
如下图,表示当原表的id=new_test.id时,比较createdateaccount,若不同就进行更新,如果没有此id就插入该数据
运行Transformation
一个简单的Transformation如下:
这里可以不用配置直接启动
执行结果中可以查到执行步骤以及输出日志
5 任务(Job
新建一个Job
kettle默认job文件保存后后缀名为kjb
核心对象
菜单列出的是Job中可以调用的环节列表,可以通过鼠标拖动的方式对环节进行添加。
每一个环节可以通过鼠标拖动来将环节添加到主窗口中。
并可通过shift+鼠标拖动,实现环节之间的连接。
常用节点介绍(红节点后面逐一演示)
类别
环节名称
功能说明
START
开始
DUMMY
结束
Transformation
引用Transformation流程
Job
引用Job流程
Shell
调用Shell脚本
SQL
执行sql语句
FTP
通过FTP下载
Mail
发送邮件
Table exists
检查目标表是否存在,返回布尔值
File exists
检查文件是否存在,返回布尔值
Wait for
等待时间,设定一段时间,kettle流程处于等待状态
Javascript
执行JavaScript脚本
Create file
创建文件
Delete file
删除文件
Wait for file
等待文件,文件出现后继续下一个环节
File Compare
文件比较,返回布尔值
Zip file
压缩文件为ZIP
Start结点
一个Job任务要求要有一个start结点作为工作流入口。
如图,设置任务流开始执行的时间,可以循环执行,该图定义为每天16:32执行一次
Transformation结点
调用一个Transformation,选择指定的Transformation文件(*.ktr
Mail结点
配置好目的方和发送方的地址以及SMTP服务器地址
此处验证是发送方的用户信息:
运行Job
一个简单的Job流程如下:
开始结点进入流程,到目标时刻时执行DBUpdate,成功后会发送邮件
6 Java调用
注:此处用的是kettle3kettle4的调用有所不同
Jar包引用
调用Kettle需要用到的基本jar包如下:
其他jar包可根据具体的transformationjob做添加,这些jar包基本都可以从data-integration目录中的libext文件夹下到。
调用本地的transformation
示例代码如下:
/**
    * 调用本地trans
    *
java创建文件
    * @param transFileName
    *            trans文件路径
    * @throws KettleException
    */
    public void callNativeTrans(String transFileName) throws KettleException {
        // 初始化
        EnvUtil.environmentInit();
        StepLoader.init();
        // 转换元对象
        TransMeta transMeta = new TransMeta(transFileName);
        // 转换
        Trans trans = new Trans(transMeta);
        // 执行转换
        ute(null);
        // 等待转换执行结束
        trans.waitUntilFinished();
    }
调用本地的job
示例代码如下:
/**
    * 调用本地job
    *
    * @param jobFileName
    *            job文件路径
    * @throws KettleException
    */
    public void callNativeJob(String jobFileName) throws KettleException {
        // 初始化
        EnvUtil.environmentInit();
        JobEntryLoader.init();
        StepLoader.init();
        // 日志TransTest.log最终会出现在项目根目录下
        LogWriter log = LogWriter.getInstance("TransTest.log", true,
                LogWriter.LOG_LEVEL_DETAILED);
        // job元对象
        JobMeta jobMeta = new JobMeta(log, jobFileName, null);
        // job
        Job job = new Job(log, StepLoader.getInstance(), null, jobMeta);
        jobMeta.setInternalKettleVariables(job);
        // 执行job
        ute();
        // 等待job执行结束
        job.waitUntilFinished();
    }
带参数的transformation调用
做这样一个transformation(将传入的参数输出到一个文本文件中):
获取系统信息中作如下配置:
   
示例代码如下:
/**
    * 带参数的transformation调用
    *
    * @param transFileName
    *            trans文件路径
    * @param params
    *            参数
    * @throws KettleException
    */
    public static void callNativeTrans(String transFileName,String [] params) throws KettleException {
        // 初始化
        EnvUtil.environmentInit();
        StepLoader.init();
        // 转换元对象
        TransMeta transMeta = new TransMeta(transFileName);
        // 转换
        Trans trans = new Trans(transMeta);
        //此处为输入的参数,也可以通过参数传进方法中
        //String [] s = {"123"};
        // 执行转换
        ute(params);
        // 等待转换执行结束
        trans.waitUntilFinished();
    }
   

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。