SpringBoot中并发定时任务的实现、动态定时任务的实现(看这⼀
篇就够了)
原创不易,如需转载,请注明出处,否则将追究法律责任!!!
⼀、在JAVA开发领域,⽬前可以通过以下⼏种⽅式进⾏定时任务1、单机部署模式
Timer:jdk中⾃带的⼀个定时调度类,可以简单的实现按某⼀频度进⾏任务执⾏。提供的功能⽐较单⼀,⽆法实现复杂的调度任务。
ScheduledExecutorService:也是jdk⾃带的⼀个基于线程池设计的定时任务类。其每个调度任务都会分配到线程池中的⼀个线程执⾏,所以其任务是并发执⾏的,互不影响。
Spring Task:Spring提供的⼀个任务调度⼯具,⽀持注解和配置⽂件形式,⽀持Cron表达式,使⽤简单但功能强⼤。
Quartz:⼀款功能强⼤的任务调度器,可以实现较为复杂的调度功能,如每⽉⼀号执⾏、每天凌晨执⾏、每周五执⾏等等,还⽀持分布式调度,就是配置稍显复杂。
2、分布式集模式(不多介绍,简单提⼀下)
问题:
I、如何解决定时任务的多次执⾏?
II、如何解决任务的单点问题,实现任务的故障转移?
问题I的简单思考:
1、固定执⾏定时任务的机器(可以有效避免多次执⾏的情况,缺点就是单点故障问题)。
2、借助Redis的过期机制和分布式锁。
3、借助mysql的锁机制等。
成熟的解决⽅案:
1、Quartz:可以去看看这篇⽂章[Quartz分布式]( wwwblogs/jiafuwei/p/6145280.html)。
2、elastic-job:(github/elasticjob/elastic-job-lite)当当开发的弹性分布式任务调度系统,采⽤zookeeper实现分布式协调,实现任务⾼可⽤以及分⽚。
3、xxl-job:(github/xuxueli/xxl-job)是⼤众点评员发布的分布式任务调度平台,是⼀个轻量级分布式任务调度框架。
4、saturn:(github/vipshop/Saturn) 是唯品会提供⼀个分布式、容错和⾼可⽤的作业调度服务框架。
⼆、SpringTask实现定时任务(这⾥是基于springboot)
1、简单的定时任务实现
使⽤⽅式:
使⽤@EnableScheduling注解开启对定时任务的⽀持。
使⽤@Scheduled 注解即可,基于corn、fixedRate、fixedDelay等⼀些定时策略来实现定时任务。
使⽤缺点:
1、多个定时任务使⽤的是同⼀个调度线程,所以任务是阻塞执⾏的,执⾏效率不⾼。
2、其次如果出现任务阻塞,导致⼀些场景的定时计算没有实际意义,⽐如每天12点的⼀个计算任务被
阻塞到1点去执⾏,会导致结果并⾮我们想要的。
使⽤优点:
1、配置简单
2、适⽤于单个后台线程执⾏周期任务,并且保证顺序⼀致执⾏的场景
源码分析:
//默认使⽤的调度器
if(this.taskScheduler == null) {
this.localExecutor = wSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//可以看到SingleThreadScheduledExecutor指定的核⼼线程为1,说⽩了就是单线程执⾏
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
//利⽤了DelayedWorkQueue延时队列作为任务的存放队列,这样便可以实现任务延迟执⾏或者定时执⾏
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
2、实现并发的定时任务
使⽤⽅式:
⽅式⼀:由1中我们知道之所以定时任务是阻塞执⾏,是配置的线程池决定的,那就好办了,换⼀个不就⾏了!直接上代码:
@Configuration
public class ScheduledConfig implements SchedulingConfigurer {
@Autowired
private TaskScheduler myThreadPoolTaskScheduler;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
//简单粗暴的⽅式直接指定
//scheduledTaskRegistrar.wScheduledThreadPool(5));
//也可以⾃定义的线程池,⽅便线程的使⽤与维护,这⾥不多说了
scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
}
}
@Bean(name = "myThreadPoolTaskScheduler")
public TaskScheduler getMyThreadPoolTaskScheduler() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(10);
taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//调度器shutdown被调⽤时等待当前被调度的任务完成
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
//等待时长
taskScheduler.setAwaitTerminationSeconds(60);
return taskScheduler;
}
⽅式⼆:⽅式⼀的本质改变了任务调度器默认使⽤的线程池,接下来这种是不改变调度器的默认线程池,⽽是把当前任务交给⼀个异步线程池去执⾏
⾸先使⽤@EnableAsync 启⽤异步任务
然后在定时任务的⽅法加上@Async即可,默认使⽤的线程池为SimpleAsyncTaskExecutor(该线程池默认来⼀个任务创建⼀个线程,就会不断创建⼤量线程,极有可能压爆服务器内存。当然它有⾃⼰的限流机制,这⾥就不多说了,有兴趣的⾃⼰翻翻源码~)
项⽬中为了更好的控制线程的使⽤,我们可以⾃定义我们⾃⼰的线程池,使⽤⽅式@Async("myThreadPool")
废话太多,直接上代码:
@Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
@Async("myThreadPoolTaskExecutor")
//@Async
springboot其实就是springpublic void scheduledTest02(){
System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
}
//⾃定义线程池
@Bean(name = "myThreadPoolTaskExecutor")
public TaskExecutor  getMyThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(20);
taskExecutor.setMaxPoolSize(200);
taskExecutor.setQueueCapacity(25);
taskExecutor.setKeepAliveSeconds(200);
taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
// 线程池对拒绝任务(⽆线程可⽤)的处理策略,⽬前只⽀持AbortPolicy、CallerRunsPolicy;默认为后者
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//调度器shutdown被调⽤时等待当前被调度的任务完成
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//等待时长
taskExecutor.setAwaitTerminationSeconds(60);
taskExecutor.initialize();
return taskExecutor;
}
线程池的使⽤⼼得(后续有专门⽂章来探讨)
java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,对应与spring中的ThreadPoolTaskExecutor和
ThreadPoolTaskScheduler,但是在原有的基础上增加了新的特性,在spring环境下更容易使⽤和控制。
使⽤⾃定义的线程池能够避免⼀些默认线程池造成的内存溢出、阻塞等等问题,更贴合⾃⼰的服务特性
使⽤⾃定义的线程池便于对项⽬中线程的管理、维护以及监控。
即便在⾮spring环境下也不要使⽤java默认提供的那⼏种线程池,坑很多,阿⾥代码规约不说了吗,得相信⼤⼚!!!
三、动态定时任务的实现
问题:
使⽤@Scheduled注解来完成设置定时任务,但是有时候我们往往需要对周期性的时间的设置会做⼀些
改变,或者要动态的启停⼀个定时任务,那么这个时候使⽤此注解就不太⽅便了,原因在于这个注解中配置的cron表达式必须是常量,那么当我们修改定时参数的时候,就需要停⽌服务,重新部署。
解决办法:
⽅式⼀:实现SchedulingConfigurer接⼝,重写configureTasks⽅法,重新制定Trigger,核⼼⽅法就是addTriggerTask(Runnable task, Trigger trigger) ,不过需要注意的是,此种⽅式修改了配置值后,需要在下⼀次调度结束后,才会更新调度器,并不会在修改配置值时实时更新,实时更新需要在修改配置值时额外增加相关逻辑处理。
@Configuration
public class ScheduledConfig implements SchedulingConfigurer {
@Autowired
private TaskScheduler myThreadPoolTaskScheduler;
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
//scheduledTaskRegistrar.wScheduledThreadPool(5));
scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
//可以实现动态调整定时任务的执⾏频率
scheduledTaskRegistrar.addTriggerTask(
//1.添加任务内容(Runnable)
() -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),
//2.设置执⾏周期(Trigger)
triggerContext -> {
//2.1 从数据库动态获取执⾏周期
String cron = "0/2 * * * * ? ";
//2.2 合法性校验.
/
/                    if (StringUtils.isEmpty(cron)) {
//                        // Omitted Code ..
//                    }
//2.3 返回执⾏周期(Date)
return new CronTrigger(cron).nextExecutionTime(triggerContext);
}
);
}
}
⽅式⼆:使⽤threadPoolTaskScheduler类可实现动态添加删除功能,当然也可实现执⾏频率的调整
⾸先,我们要认识下这个调度类,它其实是对java中ScheduledThreadPoolExecutor的⼀个封装改进后的产物,主要改进有以下⼏点:
1、提供默认配置,因为是ScheduledThreadPoolExecutor,所以只有poolSize这⼀个默认参数。
2、⽀持⾃定义任务,通过传⼊Trigger参数。
3、对任务出错处理进⾏优化,如果是重复性的任务,不抛出异常,通过⽇志记录下来,不影响下次运⾏,如果是只执⾏⼀次的任务,将异常往上抛。
顺便说下ThreadPoolTaskExecutor相对于ThreadPoolExecutor的改进点:
1、提供默认配置,原⽣的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他没有默认配置
2、实现AsyncListenableTaskExecutor接⼝,⽀持对FutureTask添加success和fail的回调,任务成功或失败的时候回执⾏对应回调⽅法。
3、因为是spring的⼯具类,所以抛出的RejectedExecutionException也会被转换为spring框架的TaskRejectedException异常(这个⽆所谓)
4、提供默认ThreadFactory实现,直接通过参数重载配置
扯了这么多,还是直接上代码:
@Component
public class DynamicTimedTask {
private static final Logger logger = Logger(DynamicTimedTask.class);
//利⽤创建好的调度类统⼀管理
//@Autowired
//@Qualifier("myThreadPoolTaskScheduler")
//private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;
//接受任务的返回结果
private ScheduledFuture<?> future;
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
/
/实例化⼀个线程池任务调度类,可以使⽤⾃定义的ThreadPoolTaskScheduler
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
return new ThreadPoolTaskScheduler();
}
/**
* 启动定时任务
* @return
*/
public boolean startCron() {
boolean flag = false;
//从数据库动态获取执⾏周期
String cron = "0/2 * * * * ? ";
future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);
if (future!=null){
flag = true;
logger.info("定时check训练模型⽂件,任务启动成功!!!");
}else {
logger.info("定时check训练模型⽂件,任务启动失败!!!");
}
return flag;
}
/**
* 停⽌定时任务
* @return
*/
public boolean stopCron() {
boolean flag = false;
if (future != null) {
boolean cancel = future.cancel(true);
if (cancel){
flag = true;
logger.info("定时check训练模型⽂件,任务停⽌成功!!!");
}else {
logger.info("定时check训练模型⽂件,任务停⽌失败!!!");
}
}else {
flag = true;
logger.info("定时check训练模型⽂件,任务已经停⽌!!!");
}
return flag;
}
class CheckModelFile implements Runnable{
@Override
public void run() {
//编写你⾃⼰的业务逻辑
System.out.print("模型⽂件检查完毕!!!")
}
}
}
四、总结
到此基于springtask下的定时任务的简单使⽤算是差不多了,其中不免有些错误的地⽅,或者理解有偏颇的地⽅欢迎⼤家提出来!
基于分布式集下的定时任务使⽤,后续有时间再继续!!!
个⼈博客地址:
csdn:
cnblogs:
segmentfault:
github:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。