java线程池实践
线程池⼤家都很熟悉,⽆论是平时的业务开发还是框架中间件都会⽤到,⼤部分都是基于JDK线程池ThreadPoolExecutor做的封装,
都会牵涉到这⼏个核⼼参数的设置:核⼼线程数,等待(任务)队列,最⼤线程数,拒绝策略等。
但如果线程池设置不当就会引起⼀系列问题, 下⾯就说下我最近碰到的问题。
案件还原
优惠券⽹站 m.cps3
⽐如你有⼀个项⽬中有个接⼝部分功能使⽤了线程池,这个功能会去调⽤多个第三⽅接⼝,都有⼀定的耗时,为了不影响主流程的性能,不增加整体响应时间,所以放在线程池⾥和主线程并⾏执⾏,等线程池⾥的任务执⾏完通过的⽅式获取线程池⾥的线程执⾏结果,然后合并到主流程的结果⾥返回,⼤致流程如下:
线程池参数为:
coresize:50
max:200
queuesize:1
keepalivetime:60s
拒绝策略为reject
假设每次请求提交5个task到线程池,平均每个task是耗时50ms
没过⼀会就收到了线程池满了⾛了拒绝策略的报错
结合你对线程池的了解,先思考下为什么
线程池的⼯作流程如下:
根据这个我们来列⼀个时间线
1. 项⽬刚启动第1次请求(每次5个task提交到线程池),创建5个核⼼线程
2. 第2次请求继续创建5个(共10个核⼼线程了)
3. 直到第10次核⼼线程数会达满50个
4. 核⼼线程处理完之后核⼼线程会⼲嘛呢
根据 jdk1.8的线程池的源码:
线程池的线程处理处理了交给它的task之后,它会去getTask()
源码如下:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//注意这段
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
请注意上⾯代码中的bool类型的timed的赋值逻辑,
由于allowCoreThreadTimeOut默认为false,也就是说:
只要创建的线程数量超过了核⼼线程数,那么⼲完⼿上活后的线程(不管是核⼼线程,还是超过队列后新开的线程)就会⾛进
//线程状态为 timedwaiting
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
由于我们上⾯步骤⾥⾯还没有超过coresize所以会⾛进
//线程状态为 waiting
workQueue.take()
所以答案是:上⾯步骤⼲活的核⼼线程处理完之后核⼼线程会进⼊waiting状态,
只要队列⼀有活就会被唤醒去⼲活。
5. 到第11次的时候
好家伙,到这步骤的时候 ,核⼼线程数已满,那么就往队列⾥⾯塞,但是设置的queuesize=1,
每次有5个task,那就是说往队列⾥⾯塞1个,剩下4个(别较真我懂你意思)要创建新的max线程了。
结果:
核⼼线程数:50
队列:1
max线程:4个
因为50个核⼼线程在waiting中,所以队列只要⼀add,就会⽴马被消费,假设消费的这个核⼼线程名字是⼩A。
这⾥要细品⼀下:
这⾥已经总线程数⼤于核⼼线程数了,那么getTask()⾥⾯
// timed=true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
那么⼩A⼲完活就会⾛进
//线程状态为 timedwaiting
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
此处核⼼线程⼩A就会变成timedwaiting的状态(keepalive设置的是60s)
6. 到第12次的时候
继续往队列塞1个,创建4个max线程,max线程已经有8个了
这⾥ ⼜会有⼀个新的核⼼线程⼩B ,会变成timedwaiting状态了
max线程们⼲完⼿上的活后,也会去调⽤getTask() 也会进⼊timedwaiting状态
因为queuesize=1,狼多⾁少
7. 继续下去,那么最终会变成
max满了,线程们都在timedwaiting(keepalive设置的是60s)
新的提交就会⾛拒绝策略了
问题总结
其实核⼼与⾮核⼼对于线程池来说都是⼀样的,只要⼀旦线程数超过了核⼼线程数,那么线程就会⾛进timewaiting
把queuesize调⼤就好了?
这⾥⼜有⼀个新的注意点:
上⾯举例的是I/O密集型业务,queuesize不是越⼤越好的,
因为:
线程池新创建的线程会优先处理新请求进来的任务,⽽不是去处理队列⾥的任务,队列⾥的任务只能等核⼼线程数忙完了才能被执⾏,这样可能造成队列⾥的任务长时间等待,导致队列积压,尤其是I/O密集场景
慎⽤CallRunnerPolicy这个拒绝策略
⼀定得理解这个策略会带来什么影响,
先看下这个拒绝策略的源码
如果你提交线程池的任务即时失败也没有关系的话,⽤这个拒绝策略是致命的,
因为⼀旦超过线程池的负载后开始吞噬tomcat线程。
⽤的⽅式慎⽤DiscardPolicy这个拒绝策略
如果需要得到线程池⾥的线程执⾏结果,使⽤future的⽅式,拒绝策略不建议使⽤DiscardPolicy,这种丢弃策略虽然不执⾏⼦线程的任务,
但是还是会返回future对象(其实在这种情况下我们已经不需要线程池返回的结果了),然后后续代码即使判断了future!=null也没⽤,
这样的话还是会⾛到()⽅法,如果get⽅法没有设置超时时间会导致⼀直阻塞下去
类似下⾯的伪代码:
// 如果线程池已满,新的请求会被直接执⾏拒绝策略,此时如果拒绝策略设置的是DiscardPolicy丢弃任务,
// 则还是会返回future对象, 这样的话后续流程还是可能会⾛到get获取结果的逻辑
Future<String> future = executor.submit(() -> {
// 业务逻辑,⽐如调⽤第三⽅接⼝等操作
return result;
});
// 主流程调⽤逻辑
if(future != null) // 如果拒绝策略是DiscardPolicy还是会⾛到下⾯代码
<(超时时间); // 调⽤⽅阻塞等待结果返回,直到超时
推荐解决⽅案
1. ⽤动态线程池,可以动态修改coresize,maxsize,queuesize,keepalivetime
对线程池的核⼼指标进⾏埋点监控,可以通过继承 ThreadPoolExecutor 然后Override掉
beforeExecute,afterExecute,shutdown,shutdownNow⽅法,进⾏埋点记录到es
可以埋点的数据有:
包括线程池运⾏状态、核⼼线程数、最⼤线程数、任务等待数、已完成任务数、线程池异常关闭等信息
名称含义
core_pool_size定义的核⼼线程总数java线程池创建的四种
max_pool_size定义的maxpoolsize
keep_alive_time定义的keepalivetime
current_pool_size当前线程池总线程数
queue_wait_size当前队列中等待处理的个数
active_count当前run状态的线程数
completed_count当前线程池中的每个线程处理的task数的叠加值
task_count等于completed_count加上queue_wait_size
shutdown当前线程池的状态是否关闭
useRate当前线程池利⽤率:((active_count * 1.0 / max_pool_size) * 100)
基于以上数据,我们可以实时监控和排查定位问题

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。