SpringBoot中如何配置线程池拒绝策略,妥善处理好溢出的任务
通过之前三篇关于Spring Boot异步任务实现的博⽂,我们分别学会了、、。今天这篇,我们继续对上⾯的知识进⾏完善和优化!
如果你已经看过上⾯⼏篇内容并已经掌握之后,⼀起来思考下⾯这个问题:
假设,线程池配置为核⼼线程数2、最⼤线程数2、缓冲队列长度2。此时,有5个异步任务同时开始,会发⽣什么?
场景重现
我们先来把上⾯的假设⽤代码实现⼀下:
第⼀步:创建Spring Boot应⽤,根据上⾯的假设写好线程池配置。
@EnableAsync
@SpringBootApplication
public class Chapter78Application {
public static void main(String[] args) {
SpringApplication.run(Chapter78Application.class, args);
}
@EnableAsync
@Configuration
class TaskPoolConfig {
@Bean
public Executor taskExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(2);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("executor-1-");
return executor;
}
}
}
第⼆步:⽤@Async注解实现⼀个部分任务
@Slf4j
@Component
public class AsyncTasks {
public static Random random = new Random();
@Async("taskExecutor1")
public CompletableFuture<String> doTaskOne(String taskNo) throws Exception {
log.info("开始任务:{}", taskNo);
long start = System.currentTimeMillis();
Thread.Int(10000));
long end = System.currentTimeMillis();
log.info("完成任务:{},耗时:{} 毫秒", taskNo, end - start);
return CompletableFuturepletedFuture("任务完成");
}
}
第三步:编写测试⽤例
@Slf4j
@SpringBootTest
public class Chapter78ApplicationTests {
@Autowired
private AsyncTasks asyncTasks;
@Test
public void test2() throws Exception {
// 线程池配置:core-2,max-2,queue=2,同时有5个任务,出现下⾯异常:
// ask.TaskRejectedException: Executor [urrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2,
/
/ active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: urrent.CompletableFuture$AsyncSupply@408e96d9
long start = System.currentTimeMillis();
// 线程池1
CompletableFuture<String> task1 = asyncTasks.doTaskOne("1");
CompletableFuture<String> task2 = asyncTasks.doTaskOne("2");
CompletableFuture<String> task3 = asyncTasks.doTaskOne("3");
CompletableFuture<String> task4 = asyncTasks.doTaskOne("4");
CompletableFuture<String> task5 = asyncTasks.doTaskOne("5");
// ⼀起执⾏
CompletableFuture.allOf(task1, task2, task3, task4, task5).join();
long end = System.currentTimeMillis();
log.info("任务全部完成,总耗时:" + (end - start) + "毫秒");
}
}
执⾏⼀下,可以类似下⾯这样的⽇志信息:
2021-09-22 17:33:08.159  INFO 21119 --- [  executor-1-2] com.didispace.chapter78.AsyncTasks      : 开始任务:2
2021-09-22 17:33:08.159  INFO 21119 --- [  executor-1-1] com.didispace.chapter78.AsyncTasks      : 开始任务:1
ask.TaskRejectedException: Executor [urrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: urrent. at org.ute(ThreadPoolTaskExecutor.java:324)
at urrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at urrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)
at com.didispace.st2(Chapter78ApplicationTests.java:51)
flect.NativeMethodAccessorImpl.invoke0(Native Method)
flect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
flect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at flect.Method.invoke(Method.java:498)
at org.junit.platformmons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.ution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.ution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.sion.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.sion.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.sion.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.ution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.ution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.ution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.ution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.ution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.ution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.ution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.ution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.ine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.ine.support.ute(ThrowableCollector.java:73)
at org.ine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.ine.ute(TestMethodTestDescriptor.java:131)
at org.ine.ute(TestMethodTestDescriptor.java:65)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.ine.support.ute(ThrowableCollector.java:73)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.ine.support.hierarchical.Node.around(Node.java:137)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.ine.support.ute(ThrowableCollector.java:73)
at org.ine.support.uteRecursively(NodeTestTask.java:126)
at org.ine.support.ute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.ine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.ine.support.ute(ThrowableCollector.java:73)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.ine.support.hierarchical.Node.around(Node.java:137)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.ine.support.ute(ThrowableCollector.java:73)
at org.ine.support.uteRecursively(NodeTestTask.java:126)
at org.ine.support.ute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.ine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.ine.support.ute(ThrowableCollector.java:73)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.ine.support.hierarchical.Node.around(Node.java:137)
at org.ine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.ine.support.ute(ThrowableCollector.java:73)
at org.ine.support.uteRecursively(NodeTestTask.java:126)
at org.ine.support.ute(NodeTestTask.java:84)
at org.ine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.ine.support.ute(HierarchicalTestExecutor.java:57)
at org.ine.support.ute(HierarchicalTestEngine.java:51)
at org.junit.ute(EngineExecutionOrchestrator.java:108)
at org.junit.ute(EngineExecutionOrchestrator.java:88)
at org.junit.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.ute(EngineExecutionOrchestrator.java:52)
at org.junit.ute(DefaultLauncher.java:96)
spring aop应用场景at org.junit.ute(DefaultLauncher.java:75)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
at junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: urrent.RejectedExecutionException: Task urrent.CompletableFuture$AsyncSupply@64968732 rejected from urrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, q  at urrent.jectedExecution(ThreadPoolExecutor.java:2063)
at ject(ThreadPoolExecutor.java:830)
at ute(ThreadPoolExecutor.java:1379)
at org.ute(ThreadPoolTaskExecutor.java:321)
... 74 more
从异常信息ask.TaskRejectedException: Executor [urrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks =
0]] did not accept task: 中,可以很明确的知道,第5个任务因为超过了执⾏线程+缓冲队列长度,⽽被拒绝了。
所有,默认情况下,线程池的拒绝策略是:当线程池队列满了,会丢弃这个任务,并抛出异常。
配置拒绝策略
虽然线程池有默认的拒绝策略,但实际开发过程中,有些业务场景,直接拒绝的策略往往并不适⽤,有时候我们可能会选择舍弃最早开始执⾏⽽未完成的任务、也可能会选择舍
弃刚开始执⾏⽽未完成的任务等更贴近业务需要的策略。所以,为线程池配置其他拒绝策略或⾃定义拒绝策略是很常见的需求,那么这个要怎么实现呢?
下⾯就来具体说说今天的正题,如何为线程池配置拒绝策略、如何⾃定义拒绝策略。
看下⾯这段代码的最后⼀⾏,setRejectedExecutionHandler⽅法就是为线程池设置拒绝策略的⽅法:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//...其他线程池配置
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
在ThreadPoolExecutor中提供了4种线程的策略可以供开发者直接使⽤,你只需要像下⾯这样设置即可:
// AbortPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// DiscardPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
// DiscardOldestPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
// CallerRunsPolicy策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
这四个策略对应的含义分别是:
AbortPolicy策略:默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
DiscardPolicy策略:如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常。
DiscardOldestPolicy策略:如果队列满了,会将最早进⼊队列的任务删掉腾出空间,再尝试加⼊队列。
CallerRunsPolicy策略:如果添加到线程池失败,那么主线程会⾃⼰去执⾏该任务,不会等待线程池中的线程去执⾏。
⽽如果你要⾃定义⼀个拒绝策略,那么可以这样写:
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 拒绝策略的逻辑
}
});
当然如果你喜欢⽤Lamba表达式,也可以这样写:
executor.setRejectedExecutionHandler((r, executor1) -> {
// 拒绝策略的逻辑
});
好了,今天的学习就到这⾥!
如果您学习过程中如遇困难?可以加⼊我们超⾼质量的,参与交流与讨论,更好的学习与进步!更多,欢迎收藏与转发⽀持!代码⽰例
本⽂的完整⼯程可以查看下⾯仓库中2.x⽬录下的chapter7-8⼯程:
Github:
Gitee:
如果您觉得本⽂不错,欢迎Star⽀持,您的关注是我坚持的动⼒!
欢迎关注我的:程序猿DD,分享外⾯看不到的⼲货与思考!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。