java分⽚_⽤于将⼤量的数据进⾏分⽚分页分批处理的通⽤处
理框架
数据处理通⽤框架
数据处理通⽤框架,⽤于将⼤量的数据进⾏分⽚分页分批处理的通⽤处理框架
背景
随着 系统迁移⼯作 的展开,数据迁移的需求越来越多:从SqlServer迁移到Mysql,或者SqlServer/Mysql到ES
这些迁移⼯作都有⼀个共同点,就是量⽐较⼤,耗时⽐较长,属于IO密集型的任务,可以使⽤多线程提升效率。
此框架脱胎于ES数据导⼊任务,该任务需要将 2000 万左右的商机主体数据从数据库查出并调⽤其他接⼝查询到商机的附加字段,然后导⼊到ES中。
其难点在于,数据量较⼤,耗时较长,商机没有⾃增主键⽆法按主键进⾏翻页,⽽且时间长的情况下,如果导到⼀半程序挂掉,需要⽀持断点续传(只重新导⼊失败的数据)
解决⽅案:数据分⽚分页分批处理框架
⽤户导向
使⽤者只需要关注数据从哪来,要做什么处理,其他的事情都交给框架解决。合理的默认值设置,通常情况下不需要关⼼处理的细节,如果有更⾼要求,则可以对⼀些参数进⾏定制化的调节。
实现原理
框架将需要分⽚分页分批处理的数据的处理过程进⾏抽象,使⽤模板模式将业务⽆关的模板代码抽离出来,让使⽤者只需要关注⾃⼰的业务实现,⽽⽆需关⼼实现的技术细节,如线程池的创建、切⽚处理、分批处理、出错记录等。
每个分⽚做为任务条件提交到线程池进⾏处理,每个分⽚线程建⽴⾃⼰的线程池,以分⽚做为分页的依据获取来源资源,将分页数据分解成各个批次,将每批次的数据做为条件创建任务提交到分⽚线程中的线程池进⾏处理。各个批次提交完毕之后,分⽚线程等待任务执⾏完毕后再记录当前切⽚的处理结果。
核⼼接⼝ DataProcessor
本框架的核⼼接⼝ DataProcessor 展⽰了处理所需要的所有功能
process 执⾏处理逻辑
processErrorSlices 导⼊失败的分⽚
resumeProgress 恢复上次未完成的任务(断点续传)
public interface DataProcessor {
/**
* 处理数据
*/
boolean process();
/**
* 导⼊失败的分⽚
*/
boolean processErrorSlices();
/**
* 恢复上次未完成的任务(断点续传)
*/
boolean resumeProgress();
}
使⽤⽅式
引⼊
com.github.dadiyang
dataprocessor
1.0.0
使⽤者唯⼀需要关⼼的就是⾃⼰的业务逻辑(数据从哪来,要做什么处理),主要就是以下三个⽅法:
generateSlices ⽤于⽣成分⽚,以根据分⽚进⾏分批处理(Long和Date类型分⽚规则已有相应的实现类) getResources 根据分⽚和分页条件从来源处获取需要被处理的数据(数据从哪来)
createTask 创建根据给定的分批数据进⾏实际处理的任务(要到哪⾥去)
根据提供以上三个⽅法的不同⽅式,有实现DataProvider接⼝和继承DataProcessorTemplate抽象类两种使⽤⽅式⽅式⼀:实现 DataProvider 接⼝传给 DefaultDataProcessor 类(推荐)
DataProvider接⼝
public interface DataProvider {
/**
* 获取所有分⽚
* @return 分⽚
*/
Set> generateSlices();
/**
* 从数据源获取需要被处理的资源
* @param slice 分⽚
* @param lastPage 上⼀页,即刚刚处理完成的这⼀页,如果是第⼀次获取则为 null
* @return 需要被处理的资源,若hashNext()返回false则认为本批次已处理完成
*/
Page getResources(Slice slice, Page lastPage) throws Exception;
/**
* 创建实际处理逻辑的任务
* @return 实际处理逻辑的任务,注意:Callable调⽤后抛出异常,则认为本批次处理失败
*/
Callable> createTask(List resources);
}
实现DataProvider接⼝,然后⽣成实现类的实例,使⽤该实例创建 DataProcessor 类
// 创建 DataProvider
DataProvider dataProvider = new ...;
DataProcessor processor = new DefaultDataProcessor<>(dataProvider);
// 开始处理
processor.process();
注意: 框架中提供了 DateSliceDataProvider 和 LongSliceDataProvider 两个实现 DataProvider 接⼝的抽象类,如果你是根据 Date 或者 Long 类型进⾏切⽚,可以选择继承⼀个以省去切⽚规则的实现。
⽅式⼆:继承 DataProcessorTemplate 抽象类
继承 DataProcessorTemplate 抽象类并实现 generateSlices、getResources、createTask 三个⽅法
使⽤⽅式与⽅式⼀类似
参数配置
DefaultDataProcessor 可以调整以下⼏个参数,可以调⽤相应的 setter ⽅法进⾏调整
slicesThreadNum 分⽚线程数,即同时进⾏处理的分⽚数,默认 8
numPerBatch 每批处理的数理,默认 1000
launchInterval 多个分⽚同时启动时,每个启动之间的间隔,单位毫秒。有些查询会⽐较耗时,如果同时启动太多个分⽚,会导致数据库压⼒过⼤导致超时,建议在查询会给数据库造成压⼒的时候适当调整此参数,默认 3000
retryTime 失败重试次数,默认 3
retryNullable 被重试的⽅法是否可以接受null值,若不能接受,⽅法返回null值视为失败,会进⾏重试(仅对分⽚任务处理有效),默认 true
注意:这些属性的 setter ⽅法不⽀持运⾏时调⽤,如果你调⽤了process() ⽅法⽽且处理过程还没有结束,不允许修改这⼏个属性,否则会抛出 throw new IllegalStateException("当前有任务正在执⾏");
断点续传
如果处理途中程序挂掉,可以使⽤断点续传功能恢复上次的任务。其粒度为切⽚级别,即恢复上次任务意味着已完成的切⽚不再重复处理,如果是处理⼀半的切⽚会重新处理。
原理是读取上次切分的所有分⽚和已完成的分⽚,取差集进⾏继续处理。
使⽤⽅式
注意事项
断点续传依赖 SliceRecorder 类记录上次的全部切⽚和已完成切⽚,恢复时读取全部切⽚和已完成切⽚进⾏取差值,所以必须保证使⽤的是同⼀个 SliceRecorder。默认的SliceRecorder实现基于⽂件存储,必须保证调⽤ resumeProgress() ⽅法时,其所使⽤的⽂件在指定的位置上。
处理到⼀半的切⽚会重新处理,意味着你的处理逻辑必须⾃⾏解决同⼀条数据会被重复处理的问题
扩展
SliceParser 切⽚解析器
⽤于对切⽚进⾏序列化和反序列化的⼯具
默认使⽤FastJson实现
如果有需要可以⾃⼰实现接⼝传给DataProcessor
SliceRecorder 切⽚记录器
⽤于记录和读取切⽚,包括启动时获取到的全部切⽚,处理完成的切⽚和处理出错的切⽚。
默认使⽤本地⽂件进⾏记录,将记录写到启动⽬录的data⽂件夹下。
你可以实现⾃⼰的记录器,只需要实现该接⼝并传给DataProcessor就可以
抽象类的使用ThreadPoolFactory 线程池⼯⼚
⽤于⽣成线程池,程序会给定⼀个经过计算认为合适的线程池⼤⼩,你可以根据⾃⼰的需要定制⾃⼰的实现
默认使⽤建议的线程池⼤⼩⽣成固定⼤⼩线程池,拒绝策略使⽤阻塞式,即当队列满时再添加任务将会被阻塞并且为每个线程命名
依赖
slf4j-api ⽇志
FastJson (在DefaultSliceParser中⽤于对分⽚的序列化和反序列化)
基本原则
最后,我想尝试以软件设计的五⼤基本原则在本框架中的应⽤,有兴趣的可以了解⼀下,否则可以直接跳过这⼀节
依赖反转原则
程序要依赖于抽象接⼝,不要依赖于具体实现
本框架主要由DataProcessor、DataProvider、SliceParser、SliceRecorder、ThreadPoolFactory五个接⼝组成,由这些接⼝相互配合实现所有的功能。
DataProcessorTemplate抽象类结合使⽤其他SliceParser、SliceRecorder、ThreadPoolFactory接⼝负责实现DataProcessor的通⽤部分功能
DefaultDataProcessor继承DataProcessorTemplate将必须实现的抽象⽅法委托给DataProvider接⼝。
可见类之前的交互都依赖于接⼝⽽不是具体的实现
单⼀职责原则
⼀个类,只有⼀个引起它变化的原因。
每个接⼝及其实现类都有⾃⼰单⼀的职责
DataProcessor: 核⼼接⼝,负责整个框架需要提供的功能
DataProvider: ⽤户必须实现的接⼝,告诉框架数据从哪⾥来,要到哪⾥去,以什么⽅式分⽚
SliceParser: 分⽚解析器,负责分⽚的序列化和反序列化
SliceRecorder: 分⽚记录器,负责记录所有分⽚、已完成分⽚和出错的分⽚
ThreadPoolFactory: 线程池⼯⼚,负责创建线程池
接⼝隔离原则
不应强⾏要求客户端依赖于它们不⽤的接⼝;
类之间的依赖应该建⽴在最⼩的接⼝上⾯。
框架中每个接⼝的职责都很单⼀,⽤户只需要关注⾃⼰需要的功能,该功能对应的接⼝没有要求任何与功能⽆关的⽅法
⾥式替换原则
继承必须确保超类所拥有的性质在⼦类中仍然成⽴。
框架中提供的所有实现类都可以替换其⽗类(接⼝)的位置
开放封闭原则

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。