SpringBoot中@Scheduled实现多线程并发定时任务SpringBoot中@Scheduled实现多线程并发定时任务
1.背景
Spring Boot实现定时任务⾮常容易,只需要使⽤Spring⾃带的Schedule注解
@Scheduled(cron = "0 */1 * * * ?")
public void cancleOrderTask() {
//实现业务
}
记得在启动类中开启定时任务
@EnableScheduling //开启定时任务
定时任务开启成功,但所有的任务都是在同⼀个线程池中的同⼀个线程来完成的。在实际开发过程中,我们当然不希望所有的任务都运⾏在⼀个线程中
2.⽅案解决
⾸选:
st;
import t.annotation.Bean;
import t.annotation.Configuration;
import t.annotation.Primary;
import org.springframework.scheduling.annotation.EnableAsync;
import org.urrent.ThreadPoolTaskExecutor;
import urrent.ThreadPoolExecutor;
@Configuration
public class ThreadPoolConfig {
/** 获取当前系统的CPU 数⽬*/
static int cpuNums = Runtime().availableProcessors();
/** 线程池核⼼池的⼤⼩*/
private static int corePoolSize = cpuNums*2+1;
/** 线程池的最⼤线程数*/
private static int maximumPoolSize = cpuNums * 5;
/**
* @Primary 优先使⽤该全局配置线程池
* 如果不加@primary @async注解默认采⽤SimpleAsyncTaskExecutor
* 不加@primary 可使⽤@async("threadPoolTaskExecutor")指定线程池
*/
@Primary
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
/** 核⼼线程数,默认为1 **/
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
threadPoolTaskExecutor.setCorePoolSize(corePoolSize);
/** 最⼤线程数,默认为Integer.MAX_VALUE **/
threadPoolTaskExecutor.setMaxPoolSize(maximumPoolSize);
/
** 队列最⼤长度,⼀般需要设置值: ⼤于等于notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE **/
threadPoolTaskExecutor.setQueueCapacity(50);
/** 线程池维护线程所允许的空闲时间,默认为60s **/
threadPoolTaskExecutor.setKeepAliveSeconds(60);
/**
* 线程池对拒绝任务(⽆线程可⽤)的处理策略,⽬前只⽀持AbortPolicy、CallerRunsPolicy;默认为后者
*
* AbortPolicy:直接抛出urrent.RejectedExecutionException异常
* CallerRunsPolicy:主线程直接执⾏该任务,执⾏完之后尝试添加下⼀个任务到线程池中,可以有效降低向线程池内添加任务的速度
* DiscardOldestPolicy:抛弃旧的任务、暂不⽀持;会导致被丢弃的任务⽆法再次被执⾏
* DiscardPolicy:抛弃当前任务、暂不⽀持;会导致被丢弃的任务⽆法再次被执⾏
*/
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.setThreadNamePrefix("task--thread");
//执⾏初始化会⾃动执⾏afterPropertiesSet()初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
⽅案⼀:
1:通过ScheduleConfig配置⽂件实现SchedulingConfigurer接⼝,并重写setSchedulerfang⽅法package;
import Bean;
import Configuration;
import TaskScheduler;
import SchedulingConfigurer;
import ThreadPoolTaskScheduler;
import ScheduledTaskRegistrar;
import Executor;
import Executors;
@Configuration
public class ScheduledConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar){
scheduledTaskRegistrar.setScheduler(setTaskExecutors());
scheduledTaskRegistrar.setScheduler(setTaskExecutors());
}
@Bean(destroyMethod="shutdown")
public Executor setTaskExecutors(){
// 10个线程来处理。
wScheduledThreadPool(10);
}
}
2:创建Bean
package;
import Bean;
import Configuration;
import TaskScheduler;
import ThreadPoolTaskScheduler;
@Configuration
public class TaskSchedulerConfig {
//线程池应该交给容器管理
@Bean
public TaskScheduler taskScheduler(){
ThreadPoolTaskScheduler scheduler =new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
return scheduler;
}
}
⽅案⼆:
1.@Async异步+线程池的两种⽅式
1. 在启动类加上@EnableAsync(不⼀定是启动类,可以是controller、service等启动时加载)
ample.worktest.async;
@SpringBootApplication
@EnableAsync
public class AsyncApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncApplication.class, args);
}
}
2. @Async注解,可以在类,⽅法,controller,service
ingrundata.task;
slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 定时查询学⽣晨跑记录
* @author Administrator
*/
@Component
@Slf4j
@EnableScheduling
@Async
public class TimerProcessTaskTest {
@Scheduled(cron = "0/2 * * * * ?")
public void doTask() throws InterruptedException {
log.info(Thread.currentThread().getName()+"===task run");
Thread.sleep(5);
}
@Scheduled(cron = "0/2 * * * * ?")
public void doTask1() throws InterruptedException {
log.info(Thread.currentThread().getName()+"===task end");
}
}
3. 解释
@Async异步⽅法默认使⽤Spring创建ThreadPoolTaskExecutor(参考TaskExecutionAutoConfiguration),
其中默认核⼼线程数为8, 默认最⼤队列和默认最⼤线程数都是Integer.MAX_VALUE. 创建新线程的条件是队列填满时, ⽽这样的配置队列永远不会填满, 如果有@Async注解标注的⽅法长期占⽤线程(⽐如HTTP长连接等待获取结果),
在核⼼8个线程数占⽤满了之后, 新的调⽤就会进⼊队列, 外部表现为没有执⾏.
解决:
⼿动配置相应属性即可. ⽐如
ution.pool.queueCapacity=4
Size=20
备注:
此处没有配置maxSize, 仍是默认的Integer.MAX_VALUE. 如果配置的话, 请考虑达到最⼤线程数时的
处理策略(JUC包查RejectedExecu tionHandler的实现类)
(默认为拒绝执⾏AbortPolicy, 即抛出异常)
AbortPolicy: 直接抛出urrent.RejectedExecutionException异常
CallerRunsPolicy: 主线程直接执⾏该任务,执⾏完之后尝试添加下⼀个任务到线程池中,可以有效降低向线程池内添加任务的速度
DiscardOldestPolicy: 抛弃旧的任务
DiscardPolicy: 抛弃当前任务
//更好的解释
AbortPolicy:直接抛出 RejectedExecutionException 异常并阻⽌系统正常运⾏。
CallerRunsPolicy:“调⽤者运⾏”机制,该策略既不会抛弃任务,也不会抛出异常,⽽是将某些任务回退到调⽤者,由调⽤者来完成任务。
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加⼊队列中尝试再次提交当前任
务。
DiscarePolicy:直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的⼀种⽅案。
fig;
import t.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.urrent.ThreadPoolTaskExecutor;
import urrent.Executor;java线程池创建的四种
import urrent.ThreadPoolExecutor;
@Configuration
public class TaskExecutorConfig implements AsyncConfigurer {
/**
* Set the ThreadPoolExecutor's core pool size.
*/
private static final int CORE_POOL_SIZE = 5;
/**
* Set the ThreadPoolExecutor's maximum pool size.
*/
private static final int MAX_POOL_SIZE = 5;
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
*/
private static final int QUEUE_CAPACITY = 1000;
/
**
* 通过重写getAsyncExecutor⽅法,制定默认的任务执⾏由该⽅法产⽣
* <p>
* 配置类实现AsyncConfigurer接⼝并重写getAsyncExcutor⽅法,并返回⼀个ThreadPoolTaskExevutor
* 这样我们就获得了⼀个基于线程池的TaskExecutor
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//cpu核数*2+1
taskExecutor.setCorePoolSize(CORE_POOL_SIZE);
taskExecutor.setMaxPoolSize(MAX_POOL_SIZE);
taskExecutor.setQueueCapacity(QUEUE_CAPACITY);
taskExecutor.setThreadNamePrefix("test-");
taskExecutor.setKeepAliveSeconds(3);
taskExecutor.initialize();
//设置线程池拒绝策略,四种线程池拒绝策略,具体使⽤哪种策略,还得根据实际业务场景才能做出抉择
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return taskExecutor;

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。