springboot的schedule分布式集实现
原理:
1.利⽤spring的schedule功能实现定时任务
2.利⽤redis的过期策略实现分布式定时任务的分配(单机版redis,集的redis请考虑redission)
内容概述:
1.多线程调度定时任务
2.增加定时任务管理表(数据库或者redis中持久化)
3.基于反射机制实现动态调⽤不同的⾃定义定时任务
4.⾃动的根据定时任务管理表对定时任务进⾏增删改查
schedule用法及搭配5.通过定时任务管理使定时任务只执⾏⼀次。
1.多线程调度定时任务
默认的schedule使⽤的是单线程,即多个定时任务需要排队执⾏,如果某些定时任务耗时过长,会导致其他任务排队过久,且不利于使⽤redis的过期策略实现分布式定时任务的分配,所以在集情况下,定时任务需要使⽤多线程实现,建议:线程数⼤于同时可执⾏任务数
/**
* @author Bight Chen
* @Date: 2021/9/17 10:27
* 定时任务线程池
*/
@Configuration
public class ScheduleConfig {
@Bean
public TaskScheduler taskScheduler(){
//此bean对象⽀持根据cron表达式创建周期性任务
ThreadPoolTaskScheduler taskScheduler =new ThreadPoolTaskScheduler();
//定时任务执⾏线程池核⼼数
//线程数⼀定⼤于当前可执⾏任务数
taskScheduler.setPoolSize(50);
//此⽅法会使得任务⼀旦被取消将⽴即被移除
taskScheduler.setRemoveOnCancelPolicy(true);
taskScheduler.setThreadNamePrefix("Schedule-");
return taskScheduler;
}
}
2.增加定时任务管理表(数据库或者redis中持久化)
1.需要的原因是利于⾃动增加或者删除所需定时任务
ps:
在mysql中做为数据表
@Data是基于lombok来简化编写 get() set() toString()等⽅法
@Data
public class ScheduleJob  {
//id
private Long scheduleJobId;
//动态bean
private String beanName;
/
/⽅法
private String methodName;
//参数
private String jobParams;
//表达式
private String jobCron;
//任务名
private String jobName;
private String remark;
//0停⽌,1正常,2已完成
private String status;
private String createdBy;
private Date createdTime;
private String lastUpdatedBy;
}
3.基于反射机制实现动态调⽤不同的⾃定义定时任务
1.反射调⽤bean的⼯具类
/**
* @author Bight Chen
* @Date: 2021/9/17 11:54
*/
@SuppressWarnings("unchecked")
@Component
public class SpringToolsConfig implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext)throws BeansException {        SpringToolsConfig.applicationContext = applicationContext;
}
public static Object getBean(String name){
Bean(name);
}
}
2.动态调⽤bean
/**
* @author Bight Chen
* @Date: 2021/9/17 11:40
*/
public class TaskRunable implements Runnable {
private static final Logger logger = Logger(TaskRunable.class);
private ScheduleJob scheduleJob;
public TaskRunable(ScheduleJob scheduleJob){
this.scheduleJob = scheduleJob;
}
private RedisTemplate redisTemplate =(RedisTemplate) Bean("redisTemplate");
@Override
public void run(){
//周期性任务:加锁的过期时间要⼤于不同服务器时间之差,且⼩于同个任务2次执⾏间隔(cron)
//抢到锁的执⾏,没抢到锁的等待下⼀次任务执⾏
try{
Object target = BeanName());
Method method =null;
if(!StringUtils.JobParams())){
method = Class().MethodName(), String.class);
}else{
method = Class().MethodName());
}
//考虑到集部署时,旧版本的查不到新增的定时任务时处理,就需要先到bean再存redis
//保证原⼦性
if(!redisTemplate.opsForValue().setIfAbsent("schdeleJob:"+ScheduleJobId().toString(),"1",15000L,TimeUnit.MILLISECONDS)){                logger.info("{}正在执⾏!!,不能重复执⾏",JobName());
return;
}
logger.info("定时任务开始执⾏ - 参数:{}", String());
long startTime = System.currentTimeMillis();
try{
ReflectionUtils.makeAccessible(method);
if(!StringUtils.JobParams())){
JSONObject jsonObject = JSONObject.JobParams());
jsonObject.put("scheduleJobId", ScheduleJobId());
method.invoke(target, JSONString(jsonObject));
}else{
method.invoke(target);
}
}catch(Exception e){
<("定时任务执⾏异常 -参数:{} ,异常:{}", String(), e);
}finally{
Object oDelete = redisTemplate.opsForValue().get("schdeleJob:"+ ScheduleJobId());
if(oDelete !=null){
/
/锁依旧存在则⾃动删除
redisTemplate.delete("schdeleJob:"+ ScheduleJobId().toString());
}
}
long times = System.currentTimeMillis()- startTime;
logger.info("定时任务执⾏结束 -参数:{},耗时:{} 毫秒", String(), times);
}catch(Exception e){
<("定时任务执⾏异常 -参数:{} ,异常:{}", String(), e);
}
}
}
4.⾃动的根据定时任务管理表对定时任务进⾏增删改查
/**
* @author Bight Chen
* @Date: 2021/9/17 13:59
* 分布式定时任务初始化
*/
@Component
public class ScheduleInitConfig {
private static final Logger logger = Logger(ScheduleInitConfig.class);
//内存中保存定时任务数据
private HashMap<Long, ScheduledFuture> map =new HashMap<>();
@Autowired
private TaskScheduler autoTaskScheduler;
@Autowired
private ScheduleJobService scheduleJobService;
/**
* 定时⾃动查询增加任务数据,注⼊定时任务
* :0/15 * * * * ?
*/
@Scheduled(cron ="${}")
public void autoAddTask(){
Long time = System.currentTimeMillis();
try{
List<ScheduleJob> list = scheduleJobService.selectByExample(new ScheduleJobExample());
for(ScheduleJob scheduleJob : list){
(ScheduleJobId())==null&&"1".Status())){
//存在启动状态的定时任务⾃动增加
TaskRunable taskRunable =new TaskRunable(scheduleJob);
ScheduledFuture future = autoTaskScheduler.schedule(taskRunable,new JobCron()));                    map.ScheduleJobId(), future);
logger.info("autoAddTask,⾃动增加任务,参数:{}", String());
}
}
}catch(Exception e){
<("autoAddTask,error:{}", e);
}
//logger.info("autoAddTask,end:{}", System.currentTimeMillis() - time);
}
/**
* 定时⾃动查询任务数据,删除过期任务列表
* :0/15 * * * * ?
*/
@Scheduled(cron ="${}")
public void autoDeleteTask(){
Long time = System.currentTimeMillis();
try{
List<ScheduleJob> list = scheduleJobService.selectByExample(new ScheduleJobExample());
for(ScheduleJob scheduleJob : list){
(ScheduleJobId())!=null&&!"1".Status())){
//⾮启动中的都删除
ScheduledFuture future = (ScheduleJobId());
future.cancel(true);
logger.info("autoDeleteTask,⾃动删除任务,参数:{}", String());
}
}
}
}catch(Exception e){
<("autoDeleteTask,error:{}", e);
}
//logger.info("autoDeleteTask,end:{}", System.currentTimeMillis() - time);
}
}
5.通过定时任务管理使定时任务只执⾏⼀次。
/**
* @author Bight Chen
* @Date: 2021/9/17 13:53
*/
@Component("testTask")
public class TestTask implements BaseTask {
private static final Logger logger = Logger(TestTask.class);
@Autowired
private ScheduleJobService scheduleJobService;
@Override
public void runTask(){}
@Override
public void runTask(String params){
JSONObject jsonObject = JSON.parseObject(params);
Long scheduleJobId =  Long.parseLong( ("scheduleJobId")+"");
try{
Thread.sleep(16000);
}catch(Exception e){
logger.info("runTask>>>"+e.getMessage());
}finally{
if(scheduleJobId !=null){
//最后关闭当前任务,使任务执⾏⼀次
scheduleJobService.updateStatusById(scheduleJobId,"2");
}
}
logger.info(Thread.currentThread().getName()+":"+ params);
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。