通过java调⽤datax,返回任务执⾏详情
(如有错漏之处,敬请指正)
DATAX
DataX 是阿⾥巴巴集团内被⼴泛使⽤的离线数据同步⼯具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间⾼效的数据同步功能。
datax的详细介绍
请参考
引⾔
因为业务需要,需要使⽤到datax把数据从⽂本写⼊到数据库,原来的做法都是使⽤python通过datax.py去调⽤脚本,阿⽂为了能更好的管控datax的任务,阿⽂要求我们对datax进⾏改造,使⽤java集成的⽅式去调⽤datax,并返回任务执⾏的详细信息。
datax源码跟踪
从github下完源码开始改造,datax的启动类在datax-core包下Engine类的entry⽅法,该⽅法是⼀个静态⽅法。
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
// 如果⽤户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
jobId = Long.parseLong(jobIdString);
} else {
// only for dsc & ds & datax 3 update
String dscJobUrlPatternString = "/instance/(\\d{1,})/l";
String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
}
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1) {
// 如果不是 standalone 模式,那么 jobId ⼀定不能为-1
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "⾮ standalone 模式必须在 URL 中提供有效的 jobId."); }
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
//打印vmInfo
VMInfo vmInfo = VmInfo();
if (vmInfo != null) {
LOG.String());
}
LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");
LOG.JSON());
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}
⾥⾯最后通过调⽤engine.start(configuration) 开始启动,我们点进去,最后会发现在⾥⾯是调⽤JobContainer 的start() ⽅法。
@Override
public void start() {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = Bool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if (isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
this.init();
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
LOG.debug("jobContainer starts to do post ...");
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
this.invokeHooks();
}
} catch (Throwable e) {
<("Exception when job run", e);
hasException = true;
if (e instanceof OutOfMemoryError) {
this.destroy();
<();
}
if (ContainerCommunicator() == null) {
// 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进⾏初始化
AbstractContainerCommunicator tempContainerCollector;
// standalone
tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);
super.setContainerCommunicator(tempContainerCollector);
}
Communication communication = ContainerCommunicator().collect();
// 汇报前的状态,不需要⼿动进⾏设置
// communication.setState(State.FAILED);
communication.setThrowable(e);
communication.dTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = ReportCommunication(communication, tempComm, alStage);
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
if (!isDryRun) {
this.destroy();
if (!hasException) {
//最后打印cpu的平均消耗,GC的统计
VMInfo vmInfo = VmInfo();
if (vmInfo != null) {
LOG.alString());
}
LOG.Instance().summarizeNoException());
this.logStatistics();
}
}
}
}
⽽我们需要的任务信息就在this.logStatistics() 中
private void logStatistics() {
long totalCosts = (dTimeStamp - this.startTimeStamp) / 1000;
long transferCosts = (dTransferTimeStamp - this.startTransferTimeStamp) / 1000;
if (0L == transferCosts) {
transferCosts = 1L;
}
if (ContainerCommunicator() == null) {
return;
}
Communication communication = ContainerCommunicator().collect();
communication.dTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = ReportCommunication(communication, tempComm, alStage);
// 字节速率
long byteSpeedPerSecond = LongCounter(CommunicationTool.READ_SUCCEED_BYTES)
/ transferCosts;
long recordSpeedPerSecond = LongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
/ transferCosts;
reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond);
reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);
LOG.info(String.format(
"\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n",
"任务启动时刻",
dateFormat.format(startTimeStamp),
"任务结束时刻",
dateFormat.format(endTimeStamp),
error parse new"任务总计耗时",
String.valueOf(totalCosts) + "s",
"任务平均流量",
StrUtil.stringify(byteSpeedPerSecond)
+ "/s",
"记录写⼊速度",
String.valueOf(recordSpeedPerSecond)
+ "rec/s", "读出记录总数",
String.TotalReadRecords(communication)),
String.TotalReadRecords(communication)),
"读写失败总数",
String.TotalErrorRecords(communication))
));
LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" +
dateFormat.format(endTimeStamp) + "|" +
String.valueOf(totalCosts) + "|" +
StrUtil.stringify(byteSpeedPerSecond) + "|" +
String.valueOf(recordSpeedPerSecond) + "|" +
String.TotalReadRecords(communication)) + "|" +
String.TotalErrorRecords(communication))
);
if (LongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
|| LongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0 || LongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) { LOG.info(String.format(
"\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
"Transformer成功记录总数",
"Transformer失败记录总数",
"Transformer过滤记录总数",
));
}
}
改造开始
新增返回实体DataxResult (get、set省略)
public class DataxResult {
//任务启动时刻
private long startTimeStamp;
/
/任务结束时刻
private long endTimeStamp;
//任务总时耗
private long totalCosts;
//任务平均流量
private long byteSpeedPerSecond;
//记录写⼊速度
private long recordSpeedPerSecond;
//读出记录总数
private long totalReadRecords;
//读写失败总数
private long totalErrorRecords;
//成功记录总数
private long transformerSucceedRecords;
// 失败记录总数
private long transformerFailedRecords;
// 过滤记录总数
private long transformerFilterRecords;
//字节数
private long readSucceedBytes;
//转换开始时间
private long endTransferTimeStamp;
/
/转换结束时间
private long startTransferTimeStamp;
//转换总耗时
private long transferCosts;
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论