Springboot之多线程多任务并⾏+线程池处理
最近项⽬中做到⼀个关于批量发短信的业务,如果⽤户量特别⼤的话,不能使⽤单线程去发短信,只能尝试着使⽤多任务来完成!我们的项⽬使⽤到了⽅式⼆,即Future的⽅案
Java 线程池
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建⼀个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若⽆可回收,则新建线程。
newFixedThreadPool 创建⼀个定长线程池,可控制线程最⼤并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建⼀个定长线程池,⽀持定时及周期性任务执⾏。
newSingleThreadExecutor 创建⼀个单线程化的线程池,它只会⽤唯⼀的⼯作线程来执⾏任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执⾏。
优点
重⽤存在的线程,减少对象创建、消亡的开销,性能佳。
可有效控制最⼤并发线程数,提⾼系统资源的使⽤率,同时避免过多资源竞争,避免堵塞。
提供定时执⾏、定期执⾏、单线程、并发数控制等功能。
⽅式⼀(CountDownLatch)
public class StatsDemo {
final static SimpleDateFormat sdf = new SimpleDateFormat(
"yyyy-MM-dd HH:mm:ss");
final static String startTime = sdf.format(new Date());
/**
* IO密集型任务 = ⼀般为2*CPU核⼼数(常出现于线程中:数据库数据交互、⽂件上传下载、⽹络数据传输等等)
* CPU密集型任务 = ⼀般为CPU核⼼数+1(常出现于线程中:复杂算法)
* 混合型任务 = 视机器配置和复杂度⾃测⽽定
*/
private static int corePoolSize = Runtime().availableProcessors();
/**
* public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
* TimeUnit unit,BlockingQueue<Runnable> workQueue)
* corePoolSize⽤于指定核⼼线程数量
* maximumPoolSize指定最⼤线程数
* keepAliveTime和TimeUnit指定线程空闲后的最⼤存活时间
* workQueue则是线程池的缓冲队列,还未执⾏的线程会在队列中等待
* 监控队列长度,确保队列有界
* 不当的线程池⼤⼩会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变⼤,消耗过多内存。
* ⽽过多的线程⼜会由于频繁的上下⽂切换导致整个系统的速度变缓——殊途⽽同归。队列的长度⾄关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。
* ExecutorService 默认的实现是⼀个⽆界的 LinkedBlockingQueue。
*/
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000));
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
//使⽤execute⽅法
latch.await();// 等待所有⼈任务结束
System.out.println("所有的统计任务执⾏完成:" + sdf.format(new Date()));
}
static class Stats implements Runnable {
String statsName;
int runTime;
CountDownLatch latch;
public Stats(String statsName, int runTime, CountDownLatch latch) {
this.statsName = statsName;
this.runTime = runTime;
this.latch = latch;
}
public void run() {
try {
System.out.println(statsName+ " do stats begin at "+ startTime);
//模拟任务执⾏时间
Thread.sleep(runTime);
System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
⽅式⼆(Future)
重点是和springboot整合,采⽤注解bean⽅式⽣成ThreadPoolTaskExecutor
@Bean
//spring依赖包
import org.urrent.ThreadPoolTaskExecutor;
@Configuration
public class GlobalConfig {
/
**
* 默认线程池线程池
*
* @return Executor
*/
@Bean
public ThreadPoolTaskExecutor defaultThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核⼼线程数⽬
executor.setCorePoolSize(16);
//指定最⼤线程数
executor.setMaxPoolSize(64);
//队列中最⼤的数⽬
executor.setQueueCapacity(16);
//线程名称前缀
executor.setThreadNamePrefix("defaultThreadPool_");
//rejection-policy:当pool已经达到max size的时候,如何处理新任务
//CALLER_RUNS:不在新线程中执⾏任务,⽽是由调⽤者所在的线程来执⾏
//对拒绝task的处理策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//线程空闲后的最⼤存活时间
executor.setKeepAliveSeconds(60);
/
/加载
executor.initialize();
return executor;springboot推荐算法
}
}
使⽤
//通过注解引⼊配置
@Resource(name = "defaultThreadPool")
private ThreadPoolTaskExecutor executor;
//使⽤Future⽅式执⾏多任务
//⽣成⼀个集合
List<Future> futures = new ArrayList<>();
//获取后台全部有效运营⼈员的集合
List<AdminUserMsgResponse> adminUserDOList = adminManagerService.GetUserToSentMsg(null);
for (AdminUserMsgResponse response : adminUserDOList) {
//并发处理
if (Mobile() != null) {
Future<?> future = executor.submit(() -> {
//
mobileMessageFacade.Mobile(), Content());
});
futures.add(future);
}
}
//查询任务执⾏的结果
for (Future<?> future : futureList) {
while (true) {//CPU⾼速轮询:每个future都并发轮循,判断完成状态然后获取结果,这⼀⾏,是本实现⽅案的精髓所在。即有10个future在⾼速轮询,完成⼀个future的获取结果,就关闭⼀个轮询if (future.isDone()&& !future.isCancelled()) {//获取future成功完成状态,如果想要限制每个任务的超时时间,取消本⾏的状态判断+(1000*1, TimeUnit.MILLISECONDS)+catch超时异常使⽤即可。 Integer i = ();//获取结果
System.out.println("任务i="+i+"获取完成!"+new Date());
list.add(i);
break;//当前future获取结果完毕,跳出while
} else {
Thread.sleep(1);//每次轮询休息1毫秒(CPU纳秒级),避免CPU⾼速轮循耗空CPU---》新⼿别忘记这个
}
}
}
完毕
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论