springboot使⽤@Async异步注解,原理+源码
1、java的⼤部分接⼝的⽅法都是串⾏执⾏的,但是有些业务场景是不需要同步返回结果的,可以把结果直接返回,具体业务异步执⾏,也有些业务接⼝是需要并⾏获取数据,最后把数据聚合在统⼀返回给前端。
通常我们都是采⽤多线程的⽅式来实现上述业务功能,但spring 提供更优雅的⽅式来实现上述功能,就是@Async 异步注解,在⽅法上添加
@Async,spring就会借助AOP,异步执⾏⽅法。
1、如何启⽤@Async
spring boot通过@EnableAsync 注解启⽤@Async异步注解
实现AsyncConfigurer接⼝,getAsyncExecutor是默认⾃定义的线程池
/**
* 线程池配置(@Async)
*/
@Slf4j
@EnableAsync
@Configuration
public class SimpleExecutorConfig implements AsyncConfigurer {
/** 线程池维护线程的最少数量 */
@Value("${PoolSize}")
private Integer corePoolSize;
/** 线程池维护线程的最⼤数量 */
@Value("${executor.maxPoolSize}")
private Integer maxPoolSize;
/** 缓冲队列的⼤⼩ */
@Value("${executor.queueCapacity}")
private Integer queueCapacity;
/** 为每个线程名设置⼀个前缀(1) */
@Value("${executor.threadNamePrefix}")
private String threadNamePrefix;
/** 为每个线程名设置⼀个前缀(2) */
@Value("${executor.threadNamePrefix_2}")
private String threadNamePrefix_2;
@Bean(ExecutorConstant.simpleExecutor_1)
@Override
public Executor getAsyncExecutor() {
/
/线程池
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.setQueueCapacity(queueCapacity);
taskExecutor.setThreadNamePrefix(threadNamePrefix);
taskExecutor.initialize();
return taskExecutor;
}
@Bean(ExecutorConstant.simpleExecutor_2)
public Executor asyncExecutor2() {
/
/线程池
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);
taskExecutor.setMaxPoolSize(maxPoolSize);
taskExecutor.setQueueCapacity(queueCapacity);
taskExecutor.setThreadNamePrefix(threadNamePrefix_2);
taskExecutor.initialize();
return taskExecutor;
}
}
2、如何使⽤@Async
下⾯是代码:
TestAsyncService类:
@Slf4j
@Service
public class TestAsyncService implements ITestAsyncService {
/**
* 异步⽅法,⽆返回值
* @return
*/
@Async
@Override
public void asyncFunction_1(){
handleBusinessTime();
log.info("asyncFunction_1 当前线程名称是:{}",Thread.currentThread().getName());
};
/////////////////异步⽅法,⽆返回值(指定线程池) start
/**
* 异步⽅法,⽆返回值(指定线程池)
* @return
*/
@Async(value = ExecutorConstant.simpleExecutor_2)
@Override
public void asyncFunction_2(){
handleBusinessTime();
log.info("asyncFunction_2 当前线程名称是:{}",Thread.currentThread().getName()); };
@Async(ExecutorConstant.simpleExecutor_2)
@Override
public void asyncFunction_3(){
handleBusinessTime();
log.info("asyncFunction_3 当前线程名称是:{}",Thread.currentThread().getName()); };
/////////////////异步⽅法,⽆返回值(指定线程池) end
/**
* 异步⽅法,有返回值
* @return
*/
@Async
@Override
public Future<Integer> asyncReturnDta_1(){
handleBusinessTime();
log.info("asyncReturnDta_1 当前线程名称是:{}",Thread.currentThread().getName()); return new AsyncResult<Integer>(1);
};
/**
* 异步⽅法,有返回值(指定线程池)
* @return
*/
@Async(ExecutorConstant.simpleExecutor_2)
@Override
public Future<Integer> asyncReturnDta_2(){
handleBusinessTime();
springboot原理图解log.info("asyncReturnDta_2 当前线程名称是:{}",Thread.currentThread().getName()); return new AsyncResult<Integer>(1);
};
/**
* 异步⽅法,有返回值-超时
* @return
*/
@Async
@Override
public Future<Integer> asyncReturnDtaTimeOut(){
handleBusinessTime();
handleBusinessTime();
handleBusinessTime();
handleBusinessTime();
log.info("asyncReturnDta_3 当前线程名称是:{}",Thread.currentThread().getName()); return new AsyncResult<Integer>(1);
};
/**
* 这⽅法模拟处理业务或者去操作数据库消耗的时间
*/
public static void handleBusinessTime(){
//去数据库查询数据耗时 start
int[] sleepTime = ateRandomNumber(2000,5000,1);
try {
//Thread.sleep 休眠的时候相当于业务操作,或者请求数据库的需要消耗的时间 Thread.sleep(sleepTime[0]);
} catch (InterruptedException e) {
e.printStackTrace();
}
//去数据库查询数据耗时 end
}
}
TestAsyncController
@Slf4j
@RestController
@RequestMapping(value = "/v1/async")
public class TestAsyncController {
@Autowired
ITestAsyncService testAsyncService;
@ApiOperation(value = "调⽤接⼝")
@RequestMapping(value = "/test", method = RequestMethod.GET)
public Resp<Integer> test() throws ExecutionException, InterruptedException {
log.info("asyncFunction_1 start");
testAsyncService.asyncFunction_1();
log.info("asyncFunction_1 start");
log.info("asyncFunction_2 start");
testAsyncService.asyncFunction_2();
log.info("asyncFunction_2 end");
log.info("asyncFunction_3 start");
testAsyncService.asyncFunction_3();
log.info("asyncFunction_3 end");
log.info("asyncReturnDta_1 & asyncReturnDta_2 start");
Future<Integer> future = testAsyncService.asyncReturnDta_1();
testAsyncService.asyncReturnDta_2();
log.info("asyncReturnDta_1 & asyncReturnDta_2 end");
Integer resp = ();
log.info("() resp:{}",resp);
return Resp.buildDataSuccess(resp);
}
@ApiOperation(value = "调⽤接⼝-超时")
@RequestMapping(value = "/async_timeOut", method = RequestMethod.GET)
public Resp<Integer> async_timeOut() throws ExecutionException, InterruptedException {
TimeInterval timeInterval = DateUtil.timer();
log.info("asyncReturnDtaTimeOut start");
Future<Integer> future = testAsyncService.asyncReturnDtaTimeOut();
log.info("asyncReturnDtaTimeOut end");
Integer resp = null;
try {
//⼀秒内返回不了数据就报错
resp = (1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
resp = -1;//请求超时了,相当于熔断,服务降级
<("(1, TimeUnit.SECONDS) timeout:",e);
}
log.info("() resp:{} 耗时:{}毫秒",resp,timeInterval.intervalRestart());
return Resp.buildDataSuccess(resp);
}
}
/v1/async/test 接⼝:
2021-06-20 21:09:30.490 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : asyncFunction_1 start
2021-06-20 21:09:30.490 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : asyncFunction_1 start
2021-06-20 21:09:30.490 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : asyncFunction_2 start
2021-06-20 21:09:30.491 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : asyncFunction_2 end
2021-06-20 21:09:30.491 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : asyncFunction_3 start
2021-06-20 21:09:30.491 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : asyncFunction_3 end
2021-06-20 21:09:30.491 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : asyncReturnDta_1 & asyncReturnDta_2 start
2021-06-20 21:09:30.492 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : asyncReturnDta_1 & asyncReturnDta_2 end
2021-06-20 21:09:32.679 INFO 14207 --- [le-1-executor-9] c.e.multi.service.impl.TestAsyncService : asyncFunction_1 当前线程名称是:my-simple-1-executor-9
2021-06-20 21:09:33.454 INFO 14207 --- [le-2-executor-8] c.e.multi.service.impl.TestAsyncService : asyncFunction_3 当前线程名称是:my-simple-2-executor-8
2021-06-20 21:09:33.578 INFO 14207 --- [le-2-executor-9] c.e.multi.service.impl.TestAsyncService : asyncReturnDta_2 当前线程名称是:my-simple-2-executor-9
2021-06-20 21:09:34.101 INFO 14207 --- [e-1-executor-10] c.e.multi.service.impl.TestAsyncService : asyncReturnDta_1 当前线程名称是:my-simple-1-executor-10
2021-06-20 21:09:34.102 INFO 14207 --- [nio-8666-exec-7] roller.TestAsyncController : () resp:1
2021-06-20 21:09:34.357 INFO 14207 --- [le-2-executor-7] c.e.multi.service.impl.TestAsyncService : asyncFunction_2 当前线程名称是:my-simple-2-executor-7
从⽇志上可以看出,都是异步执⾏的
/v1/async/async_timeOut 接⼝:
2021-06-20 21:20:58.886 INFO 14427 --- [nio-8666-exec-1] roller.TestAsyncController : asyncReturnDtaTimeOut start
2021-06-20 21:20:58.890 INFO 14427 --- [nio-8666-exec-1] roller.TestAsyncController : asyncReturnDtaTimeOut end
2021-06-20 21:20:59.899 ERROR 14427 --- [nio-8666-exec-1] roller.TestAsyncController : (1, TimeUnit.SECONDS) timeout:
urrent.TimeoutException: null
at (FutureTask.java:205) ~[na:1.8.0_231]
ller.TestAsyncController.async_timeOut(TestAsyncController.java:69) ~[classes/:na]
flect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_231]
flect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_231]
flect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_231]
at flect.Method.invoke(Method.java:498) ~[na:1.8.0_231]
at org.hod.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) [spring-web-5.3.8.jar:5.3.8]
at org.hod.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.hod.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.hod.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) [spring-webmvc-5.3.8.jar:5.3.8] at org.springframework.web.hod.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.hod.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1063) [sprin
g-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) [spring-webmvc-5.3.8.jar:5.3.8]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) [spring-webmvc-5.3.8.jar:5.3.8]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) [tomcat-embed-core-9.0.46.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) [spring-webmvc-5.3.8.jar:5.3.8]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) [tomcat-embed-core-9.0.46.jar:4.0.FR]
at org.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at at.websocket.server.WsFilter.doFilter(WsFilter.java:53) [tomcat-embed-websocket-9.0.46.jar:9.0.46]
at org.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
at org.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [spring-web-5.3.8.jar:5.3.8]
at org.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) [spring-web-5.3.8.jar:5.3.8]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) [sprin
g-web-5.3.8.jar:5.3.8]
at org.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.StandardWrapperValve.invoke(StandardWrapperValve.java:202) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.46.jar:9.0.46]
at org.tor.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.46.jar:9.0.46]
at http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.46.jar:9.0.46]
at AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.46.jar:9.0.46]
at AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.46.jar:9.0.46]
at at.util.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.46.jar:9.0.46]
at at.util.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.46.jar:9.0.46]
at urrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_231]
at urrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_231]
at at.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.46.jar:9.0.46]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_231]
2021-06-20 21:20:59.900 INFO 14427 --- [nio-8666-exec-1] roller.TestAsyncController : () resp:-1 耗时:1014毫秒
2021-06-20 21:21:12.105 INFO 14427 --- [le-1-executor-1] c.e.multi.service.impl.TestAsyncService : asyncReturnDta_3 当前线程名称是:my-simple-1-executor-1
从⽇志上看出,如果(1, TimeUnit.SECONDS) 到了超时时间,直接抛出超时异常,⾛主线程后续代码。
⽐较适合规定时间范围内要返回数据(超时可以根据业务场景,返回⼀个默认值,或者返回值值就是空的)的业务场景
@Async原理+源码
原理:是通过spring aop + 线程池的⽅式来实现的
源码:
源码的⽅法位置是:AsyncExecutionInterceptor.invoke
1. 107⾏:是获取⼀个线程池
2. 108⾏:如果没有设置线程池抛出异常
3. 113⾏:是创建⼀个线程对象他的run⽅法执⾏invocation.proceed()【⾛实际业务代码】
4. 121和124⾏:⾛的是统⼀的异常处理主要是调⽤handleUncaughtException⽅法,SimpleExecutorConfig 实现了AsyncConfigurer接⼝它就有
getAsyncUncaughtExceptionHandler⽅法,可以重写这个⽅法,实现⾃定义的异常处理
下图是AsyncConfigurer接⼝可以实现的⽅法:
doSubmit⽅法:
实际上就是调⽤线程池的submit⽅法:
具体代码,我上传到gitee,⼤家感兴趣可以clone
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论