Springboot定时任务原理及如何动态创建定时任务
⼀、前⾔
  上周⼯作遇到了⼀个需求,同步多个省份销号数据,解绑粉丝。分省定时将销号数据放到SFTP服务器上,我需要开发定时任务去解析⽂件。因为是多省份,服务器、⽂件名规则、数据规则都不⼀定,所以要做成可配置是有⼀定难度的。数据规则这块必须强烈要求统⼀,服务器、⽂件名规则都可以从配置中⼼去读。每新增⼀个省份的配置,后台感知到后,动态⽣成定时任务。
⼆、Springboot引⼊定时任务核⼼配置
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
  接下来主要看⼀下这个核⼼后置处理器:ScheduledAnnotationBeanPostProcessor 。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!ains(targetClass)) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledMethods = MergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
if (logger.isTraceEnabled()) {
}
}
else {
/
/ Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
"': " + annotatedMethods);
}
}
}
return bean;
}
  1、处理Scheduled注解,通过ScheduledTaskRegistrar注册定时任务。
private void finishRegistration() {
if (this.scheduler != null) {
}
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
}
}
if (istrar.hasTasks() && Scheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
// Search for
springboot 原理解析
}
catch (NoUniqueBeanDefinitionException ex) {
try {
}
catch (NoSuchBeanDefinitionException ex2) {
if (logger.isInfoEnabled()) {
logger.info("More than one TaskScheduler bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
}
}
}
catch (NoSuchBeanDefinitionException ex) {
// Search for ScheduledExecutorService
try {
catch (NoUniqueBeanDefinitionException ex2) {
try {
catch (NoSuchBeanDefinitionException ex3) {
if (logger.isInfoEnabled()) {
logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
}
}
}
catch (NoSuchBeanDefinitionException ex2) {
// Giving up -> falling back to default scheduler within
logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
}
  1、通过⼀系列的SchedulingConfigurer动态配置ScheduledTaskRegistrar。
  2、向ScheduledTaskRegistrar注册⼀个TaskScheduler(⽤于对Runnable的任务进⾏调度,它包含有多种触发规则)。
  3、registrar.afterPropertiesSet(),在这开始安排所有的定时任务开始执⾏了。
protected void scheduleTasks() {
if (this.taskScheduler == null) {
this.localExecutor = wSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
if (iggerTasks != null) {
for (TriggerTask task : iggerTasks) {
addScheduledTask(scheduleTriggerTask(task));
}
}
if (Tasks != null) {
for (CronTask task : Tasks) {
addScheduledTask(scheduleCronTask(task));
}
}
if (this.fixedRateTasks != null) {
for (IntervalTask task : this.fixedRateTasks) {
addScheduledTask(scheduleFixedRateTask(task));
}
}
if (this.fixedDelayTasks != null) {
for (IntervalTask task : this.fixedDelayTasks) {
addScheduledTask(scheduleFixedDelayTask(task));
}
}
}
  1、TriggerTask:动态定时任务。通过Trigger#nextExecutionTime 给定的触发上下⽂确定下⼀个执⾏时间。
  2、CronTask:动态定时任务,TriggerTask⼦类。通过cron表达式确定的时间触发下⼀个任务执⾏。
  3、IntervalTask:⼀定时间延迟之后,周期性执⾏的任务。
  4、taskScheduler 如果为空,默认是ConcurrentTaskScheduler,并使⽤默认单线程的ScheduledExecutor。
三、主要看⼀下CronTask⼯作原理
ScheduledTaskRegistrar.java
@Nullable
public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = ve(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.Runnable(), Trigger());
}
else {
addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
ConcurrentTaskScheduler.java
@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
if (priseConcurrentScheduler) {
return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
}
else {
ErrorHandler errorHandler =
(Handler != null ? Handler : DefaultErrorHandler(true));
return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
}
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);    }
}
ReschedulingRunnable.java
@Nullable
public ScheduledFuture<?> schedule() {
synchronized (iggerContextMonitor) {
this.scheduledExecutionTime = iggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
long initialDelay = Time() - System.currentTimeMillis();
this.currentFuture = utor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
private ScheduledFuture<?> obtainCurrentFuture() {
Assert.state(this.currentFuture != null, "No scheduled future");
return this.currentFuture;
}
@Override
public void run() {
Date actualExecutionTime = new Date();
super.run();
Date completionTime = new Date();
synchronized (iggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
if (!obtainCurrentFuture().isCancelled()) {
schedule();
}
}
}
  1、最终将task和trigger都封装到了ReschedulingRunnable中。
  2、ReschedulingRunnable实现了任务重复调度(schedule⽅法中调⽤调度器executor并传⼊⾃⾝对象,executor会调⽤run⽅法,run⽅法⼜调⽤了schedule⽅法)。
  3、ReschedulingRunnable schedule⽅法加了同步锁,只能有⼀个线程拿到下次执⾏时间并加⼊执⾏器的调度。
  4、不同的ReschedulingRunnable对象之间在线程池够⽤的情况下是不会相互影响的,也就是说满⾜线程池的条件下,TaskScheduler的schedule⽅法的多次调⽤是可以交叉执⾏的。
ScheduledThreadPoolExecutor.java
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
  ScheduledFutureTask ⼯作原理如下图所⽰【太懒了,不想画图了,盗图⼀张】。
  1、ScheduledFutureTask 会放⼊优先阻塞队列:ScheduledThreadPoolExecutor.DelayedWorkQueue (⼆叉最⼩堆实现)
  2、上图中的Thread 对象即ThreadPoolExecutor.Worker ,实现了Runnable 接⼝
  1、Worker 中维护了Thread 对象,Thread 对象的Runnable 实例即Worker ⾃⾝
  2、ThreadPoolExecutor#addWorker ⽅法中会创建Worker 对象,然后拿到Worker 中的thread 实例并start ,这样就创建了线程池中的⼀个线程实例
  3、Worker 的run ⽅法会调⽤ThreadPoolExecutor#runWorker ⽅法,这才是任务最终被执⾏的地⽅,该⽅法⽰意如下
  (1)⾸先取传⼊的task 执⾏,如果task 是null ,只要该线程池处于运⾏状态,就会通过getTask ⽅法从workQueue 中取任务。ThreadPoolExecutor 的execute ⽅法会在⽆法产⽣core 线程的时候向  workQueue 队列中offer 任务。
getTask ⽅法从队列中取task 的时候会根据相关配置决定是否阻塞和阻塞多久。如果getTask ⽅法结束,返回的是null ,runWorker 循环结束,执⾏processWorkerExit ⽅法。⾄此,该线程结束⾃⼰的使命,从线程池中“消失”。
  (2)在开始执⾏任务之前,会调⽤Worker 的lock ⽅法,⽬的是阻⽌task 正在被执⾏的时候被interrupt ,通过调⽤clearInterruptsForTaskRun ⽅法来保证的(后⾯可以看⼀下这个⽅法),该线程没有⾃⼰的interrupt set 了。
  (3)beforeExecute 和afterExecute ⽅法⽤于在执⾏任务前后执⾏⼀些⾃定义的操作,这两个⽅法是空的,留给继承类去填充功能。
我们可以在beforeExecute ⽅法中抛出异常,这样task 不会被执⾏,⽽且在跳出该循环的时候completedAbruptly 的值是true ,表⽰the worker died due to user exception ,会⽤decrementWorkerCount 调整wc 。
  (4)因为Runnable 的run ⽅法不能抛出Throwables 异常,所以这⾥重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterExecute 中对异常做⼀些处理。  (5)afterExecute ⽅法也可能抛出异常,也可能使当前线程死掉。
四、动态创建定时任务
#   TaskConfiguration 配置类
/**
* Creates with given first task and thread from ThreadFactory.
* @param  firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this .firstTask = firstTask;
this .thread = getThreadFactory().newThread(this );
}
/** Delegates main run loop to outer runWorker  */
public  void  run() {
runWorker(this );
}
@Configuration
@EnableScheduling
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public  class  TaskConfiguration {
@Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public  ScheduledExecutorService scheduledAnnotationProcessor() {
return  wScheduledThreadPool(5, new  DefaultThreadFactory());
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。