【SpringBoot】多线程以及⾃定义拒绝策略(基于@Async)
⼀、使⽤@Async在SpringBoot项⽬中实现多线程
1. 多线程Configuration
启动类:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import t.annotation.ComponentScan;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan("com.asiainfo.*")
public class Application extends SpringBootServletInitializer {
public static void main(String[] args){
SpringApplication.run(Application.class, args);
}
protected SpringApplicationBuilder configure(SpringApplicationBuilder application){
return application.sources(Application.class);
}
}
**@EnableAutoConfiguration:**帮助SpringBoot应⽤将所有符合条件的@Configuration配置都加载到当前SpringBoot创建并使⽤的IoC容器。
多线程配置类:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import t.annotation.Bean;
import t.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.urrent.ThreadPoolTaskExecutor;
import java.util.Hashtable;
import java.util.List;
import urrent.Executor;
import urrent.FutureTask;
@Configuration
@EnableAsync
public class ExecutorConfig {
private static final Logger logger = Logger(ExecutorConfig.class); private static Hashtable<String, List<FutureTask>> rejectTaskMap;
@Value("${thread.CORE_POOL_SIZE}")
private int corePoolSize;
@Value("${thread.MAX_POOL_SIZE}")
private int maxPoolSize;
@Value("${thread.QUEUE_CAPACITY}")
private int queueCapacity;
@Bean
public Executor asyncServiceExecutor(){
logger.info("start asyncServiceExecutor");
ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
//配置核⼼线程数
executor.setCorePoolSize(corePoolSize);
//配置最⼤线程数
executor.setMaxPoolSize(maxPoolSize);
//配置队列最⼤长度
executor.setQueueCapacity(queueCapacity);
java线程池创建的四种// rejection-policy:当pool已经达到max size的时候,如何处理新任务
/
/ CALLER_RUNS:不在新线程中执⾏任务,⽽是有调⽤者所在的线程来执⾏
//⾃定义拒绝策略
executor.setRejectedExecutionHandler(new MyRejectedPolicyHandler());
rejectTaskMap =new Hashtable<>();
//执⾏初始化
executor.initialize();
return executor;
}
public static Hashtable<String, List<FutureTask>>getRejectTaskMap(){
return rejectTaskMap;
}
}
@Configuration:
@Configuration⽤于定义配置类,可替换xml配置⽂件,被注解的类内部包含有⼀个或多个被@Bean注解的⽅法,这些⽅法将会被AnnotationConfigApplicationContext或AnnotationConfigWebApplicationContext类进⾏扫描,并⽤于构建bean定义,初始化Spring容器。
注意:@Configuration注解的配置类有如下要求:
@Configuration不可以是final类型;
@Configuration不可以是匿名类;
嵌套的configuration必须是静态类;
@EnableAsync:
以异步执⾏,允许开启多线程。
executor.setRejectedExecutionHandler(new MyRejectedPolicyHandler());
设置拒绝策略,当任务源源不断的过来,⽽我们的系统⼜处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler 接⼝提供了拒绝任务处理的⾃定义⽅法的机会。在ThreadPoolExecutor中已经包含四种处理策略。
CallerRunsPolicy:线程调⽤运⾏该任务的 execute 本⾝。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if
(!e.isShutdown()) { r.run(); }}
这个策略显然不想放弃执⾏任务。但是由于池中已经没有任何资源了,那么就直接使⽤调⽤该execute的线程本⾝来执⾏。(开始我总不想丢弃任务的执⾏,但是对某些应⽤场景来讲,很有可能造成当前线程也被阻塞。如果所有线程都是不能执⾏的,很可能导致程序没法继续跑了。需要视业务情景⽽定吧。)
AbortPolicy:处理程序遭到拒绝将抛出运⾏时 RejectedExecutionException public void
rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new
RejectedExecutionException();}
这种策略直接抛出异常,丢弃任务。(jdk默认策略,队列满并线程满时直接拒绝添加新任务,并抛出异常,所以说有时候放弃也是⼀种勇⽓,为了保证后续任务的正常进⾏,丢弃⼀些也是可以接收的,记得做好记录)
DiscardPolicy:不能执⾏的任务将被删除 public void rejectedExecution(Runnable r,
ThreadPoolExecutor e) {} 这种策略和AbortPolicy⼏乎⼀样,也是丢弃任务,只不过他不抛出异常。
DiscardOldestPolicy:如果执⾏程序尚未关闭,则位于⼯作队列头部的任务将被删除,然后重试执⾏程序(如果再次失败,则重复此过程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if
(!e.isShutdown()) {e.getQueue().poll();e.execute®; }}
该策略就稍微复杂⼀些,在pool没有关闭的前提下⾸先丢掉缓存在队列中的最早的任务,然后重新尝试运⾏该任务。这个策略需要适当⼩⼼。
除上述四种策略外,还可以添加⾃定义拒绝策略, MyRejectedPolicyHandler就是⼀个⾃定义决绝策略,下⽂中会着重讲⼀下该策略的实现⽅式。
设置线程池:
#多线程配置
thread:
CORE_POOL_SIZE:10
MAX_POOL_SIZE:100
QUEUE_CAPACITY:1000
@Value("${thread.CORE_POOL_SIZE}")
private int corePoolSize;
设置核⼼线程数量。
@Value("${thread.MAX_POOL_SIZE}")
private int maxPoolSize;
设置最⼤线程数量。
@Value("${thread.QUEUE_CAPACITY}")
private int queueCapacity;
设置缓冲队列⼤⼩。
2. 使⽤Runner启动项⽬
SpringBoot给我们提供了两个接⼝来帮助我们实现容器启动完成后⽴即执⾏。这两个接⼝分别为CommandLineRunner和ApplicationRunner。
定义⼀个类SimosApplicationRunner实现ApplicationRunner接⼝,然后Override这个ApplicationRunner接⼝的run⽅法即可。Runner:
import com.asiainfo.fig.MyRejectedPolicy;
import com.asiainfo.processor_other.task.TestTask;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import annotation.Order;
import org.springframework.stereotype.Component;
@Component
@Order(1)
public class TestRunner implements ApplicationRunner {
@Autowired
TestRunner testRunner;
@Autowired
TestTask testTask;
@Override
public void run(ApplicationArguments args)throws Exception {
}
@MyRejectedPolicy("runTest")
private void test(){
for(int i =0; i <100; i++){
testTask.runTest(i);
}
}
}
Task:
使⽤ @Async注解,每调⽤⼀次TestTask的runTest⽅法都会开启⼀个新的线程;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class TestTask {
@Async("asyncServiceExecutor")
public void runTest(int i){
System.out.printf("Test:"+ i);
}
}
⼆、⾃定义拒绝策略
⾃定义拒绝策略思路:
若线程池配置不合理,或者任务添加的速度⼤于处理的速度,会执⾏线程池拒绝策略,四个系统默认的拒绝策略,或者阻塞主进程,或者抛出异常,或者丢弃任务,在某些情况都不适⽤的情况下需要⾃定义拒绝策略经⾏容灾。
改造的思路是⾃定义拒绝策略,将线程池拒绝的任务缓存到内存中,再在合适的时机重新放⼊线程池中处理,从⽽达到了线程池防阻塞、容灾的⽬的(⽅案使⽤的内存对象为线程安全对象,效率较低,只可作为特定情况下的容灾机制使⽤)。
改造点:
修改默认缓存队列⼤⼩(默认:2147483647),改为1000
初始化⼀个全局的,线程安全的MAP<String,List>,⽤来代替原有的线程池缓存队列。其中,String 为⽅法名称, List 为
task(FutureTask)线程队列。
⾃定义拒绝策略。 当任务拒绝时,将拒绝的任务添加到Map中。
⾃定义注解类,注解到添加任务到线程池的⽅法上。原有的⽅法需要重新抽取。
通过AOP,Around⽅式截取注解的⽅法。判断全局Map 中是否有对应此⽅法的线程队列,如果有,则先执⾏在MAP缓存队列中的task,返回null,不再往线程池中添加新的task,如果MAP缓存队列中没有此⽅法对应的task,则正常添加task到线程池中。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论