ETL工具——kettle使用说明
1 简介
ETL(Extract-Transform-Load的缩写,即数据抽取、转换、装载的过程),Kettle是一款国外开源的etl工具,纯java编写,数据抽取高效稳定。
2 运行环境:
OS:Window、Linux、Unix均可
Jdk1.4以上
3 开始使用:
Kettle可以在/网站下载。下载kettle压缩包,因kettle为绿软件,解压缩到任意本地路径即可。
(本文着重介绍kettle3.2.0稳定版)
Spoon 是一个图形用户界面,在不同平台上运行Spoon需要不同的脚本:
Spoon.bat: 在windows 平台运行Spoon(或直接点击)。
Spoon.sh: 在Linux、Apple OSX、Solaris 平台运行Spoon。
登陆一般选择没有资源库:
Kettle中有两种脚本文件,transformation和job,transformation完成针对数据的基础转换,job则完成整个工作流的控制。
4 转换(Transformation)
新建一个转换
kettle默认transformation文件保存后后缀名为ktr
新建数据库连接
(此链接也可在用到的节点处配置)
填写数据源配置内容
点击Test测试连接成功:
核心对象
切换到核心对象,菜单列出的是Transformation中可以调用的环节列表,可以通过鼠标拖动的方式对环节进行添加。并且可通过shift+鼠标拖动,实现环节之间的连接。
常用节点介绍(红节点后面逐一演示)
类别 | 环节名称 | 功能说明 |
输入 | 文本文件输入 | 从本地文本文件输入数据 |
表输入 | 从数据库表中输入数据 | |
获取系统信息 | 读取系统信息输入数据 | |
输出 | 文本文件输出 | 将处理结果输出到文本文件 |
表输出 | 将处理结果输出到数据库表 | |
插入/更新 | 根据处理结果对数据库表机型插入更新,如果数据库中不存在相关记录则插入,否则为更新。会根据查询条件中字段进行判断 | |
更新 | 根据处理结果对数据库进行更新,若需要更新的数据在数据库表中无记录,则会报错停止 | |
删除 | 根据处理结果对数据库记录进行删除,若需要删除的数据在数据库表中无记录,则会报错停止 | |
查询 | 数据库查询 | 根据设定的查询条件,对目标表进行查询,返回需要的结果字段 |
流查询 | 将目标表读取到内存,通过查询条件对内存中数据集进行查询 | |
调用DB存储过程 | 调用数据库存储过程 | |
转换 | 字段选择 | 选择需要的字段,过滤掉不要的字段,也可做数据库字段对应 |
过滤记录 | 根据条件对记录进行分类 | |
排序记录 | 将数据根据某以条件,进行排序 | |
空操作 | 无操作 | |
增加常量 | 增加需要的常量字段 | |
脚本 | Modified Java Script Value | 扩展功能,编写JavaScript脚本,对数据进行相应处理 |
映射 | 映射(子转换) | 数据映射 |
作业 | Sat Variables | 设置环境变量 |
Get Variables | 获取环境变量 | |
✓ 表输入
双击拖动到工作面板上的表输入结点,选择(或者新建)所需要的数据库连接,点击获取SQL查询语句或自行编辑SQL。
若需根据前一步获取数据进行查询,可用“?”号代替,变量顺序与前一节点相同
✓ 字段选择
界面如下:
选择和修改:指定需要流到输出流中的字段的精确顺序和名称
删除:指定从输出流中删除的字段(以后输出流将不会获取到此数据)
元数据:修改元数据字段的名称、类型、长度和精度
✓ 插入/更新
如下图,表示当原表的id=new_test.id时,比较createdate和account,若不同就进行更新,如果没有此id就插入该数据
运行Transformation:
一个简单的Transformation如下:
这里可以不用配置直接启动
执行结果中可以查到执行步骤以及输出日志
5 任务(Job)
新建一个Job
kettle默认job文件保存后后缀名为kjb
核心对象
菜单列出的是Job中可以调用的环节列表,可以通过鼠标拖动的方式对环节进行添加。
每一个环节可以通过鼠标拖动来将环节添加到主窗口中。
并可通过shift+鼠标拖动,实现环节之间的连接。
常用节点介绍(红节点后面逐一演示)
类别 | 环节名称 | 功能说明 |
START | 开始 | |
DUMMY | 结束 | |
Transformation | 引用Transformation流程 | |
Job | 引用Job流程 | |
Shell | 调用Shell脚本 | |
SQL | 执行sql语句 | |
FTP | 通过FTP下载 | |
Mail | 发送邮件 | |
Table exists | 检查目标表是否存在,返回布尔值 | |
File exists | 检查文件是否存在,返回布尔值 | |
Wait for | 等待时间,设定一段时间,kettle流程处于等待状态 | |
Javascript | 执行JavaScript脚本 | |
Create file | 创建文件 | |
Delete file | 删除文件 | |
Wait for file | 等待文件,文件出现后继续下一个环节 | |
File Compare | 文件比较,返回布尔值 | |
Zip file | 压缩文件为ZIP包 | |
✓ Start结点
一个Job任务要求要有一个start结点作为工作流入口。
如图,设置任务流开始执行的时间,可以循环执行,该图定义为每天16:32执行一次
✓ Transformation结点
调用一个Transformation,选择指定的Transformation文件(*.ktr)
✓ Mail结点
配置好目的方和发送方的地址以及SMTP服务器地址
此处验证是发送方的用户信息:
运行Job:
一个简单的Job流程如下:
开始结点进入流程,到目标时刻时执行DBUpdate,成功后会发送邮件
6 Java调用
(注:此处用的是kettle3,kettle4的调用有所不同)
Jar包引用
调用Kettle需要用到的基本jar包如下:
其他jar包可根据具体的transformation或job做添加,这些jar包基本都可以从data-integration目录中的libext文件夹下到。
调用本地的transformation
示例代码如下: |
/** * 调用本地trans * java创建文件 * @param transFileName * trans文件路径 * @throws KettleException */ public void callNativeTrans(String transFileName) throws KettleException { // 初始化 EnvUtil.environmentInit(); StepLoader.init(); // 转换元对象 TransMeta transMeta = new TransMeta(transFileName); // 转换 Trans trans = new Trans(transMeta); // 执行转换 ute(null); // 等待转换执行结束 trans.waitUntilFinished(); } |
调用本地的job
示例代码如下: |
/** * 调用本地job * * @param jobFileName * job文件路径 * @throws KettleException */ public void callNativeJob(String jobFileName) throws KettleException { // 初始化 EnvUtil.environmentInit(); JobEntryLoader.init(); StepLoader.init(); // 日志TransTest.log最终会出现在项目根目录下 LogWriter log = LogWriter.getInstance("TransTest.log", true, LogWriter.LOG_LEVEL_DETAILED); // job元对象 JobMeta jobMeta = new JobMeta(log, jobFileName, null); // job Job job = new Job(log, StepLoader.getInstance(), null, jobMeta); jobMeta.setInternalKettleVariables(job); // 执行job ute(); // 等待job执行结束 job.waitUntilFinished(); } |
带参数的transformation调用
做这样一个transformation(将传入的参数输出到一个文本文件中):
获取系统信息中作如下配置:
示例代码如下: |
/** * 带参数的transformation调用 * * @param transFileName * trans文件路径 * @param params * 参数 * @throws KettleException */ public static void callNativeTrans(String transFileName,String [] params) throws KettleException { // 初始化 EnvUtil.environmentInit(); StepLoader.init(); // 转换元对象 TransMeta transMeta = new TransMeta(transFileName); // 转换 Trans trans = new Trans(transMeta); //此处为输入的参数,也可以通过参数传进方法中 //String [] s = {"123"}; // 执行转换 ute(params); // 等待转换执行结束 trans.waitUntilFinished(); } |
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论