java线程池深⼊三-Worker
⼀.⽬的
Worker⽤于执⾏任务。
顺便了解下线程池状态流:
running可以通过shutdown⽅法到shutdown状态,然后之后会变成tidying状态,最后变成terminate状态。
通过shutdownNow⽅法,则状态直接变成:stop,然后变成tidying状态,最后变成terminate状态。
这⾥需要注意shutdown,shutdownNow两个⽅法的区别
。
shutdown⽅法,不会接收新任务,但会等待处理完任务队列中的任务再结束线程池。
shutdownNow⽐较粗暴,不会接收新任务,如果有任务在执⾏会强制中断它,然后把任务队列中的任务返回给调⽤⽅,这⾥就要根据业务是否允许强制中断来分析风险点。
worker为了不消耗⼤量cpu,会阻塞,底层是blockingQueue的阻塞获取机制。
⼆.基础知识
Worker在第⼀次创建时,会先执⾏传⼊的task,之后会从任务队列中取。
多个worker在ThreadPoolExecutor中存储是⼀个hashset。
Worker数据结构:
final Thread thread;//执⾏线程
Runnable firstTask;//第⼀个任务
volatile long completedTasks;//当前worker已经完成的任务数
Worker继承于AQS。
核⼼⽅法是:【runWorker(Worker w) 】
基本逻辑分为:
1.如果是创建时执⾏,有firstTask则执⾏这个任务。否则从任务队列中获取
while (task != null || (task = getTask()) != null)
getTask())⽤于获取任务。后⾯会详细说明。
2.检查当前线程池状态,如果是stop(shutdownNOW⽅法触发),并且线程状态中断标记未设置,则设置中断位
3.执⾏beforeExecute⽅法,默认实现为空,主要是ThreadPoolExecutor交给⼦类去实现⼀些个性化逻辑
4.执⾏任务run⽅法,这⾥业务任务发⽣异常,这个worker线程会被释放。并且会如果当前线程数已经⼩于核⼼线程数则会再创建⼀个worker
5.执⾏afterExecute对错误信息可以进⾏特殊处理
⽅法代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock(); //这⾥的lock很重要,因为在shutdown线程池时会中断相应worker,这⾥加lock可以保证任务执⾏的原⼦性,防⽌任务执⾏⼀半,导致业务数据的不 //如果线程已经处于stop并且线程中断位未设置,则设置线程中断位。中断位需要业务任务⾃⼰去检测使⽤。
if (((), STOP) ||
(Thread.interrupted() &&
(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//执⾏任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
wpletedTasks++;
w.unlock();
}
}
completedAbruptly = false;//这个标记⽤于区分是异常或者中断,还是正常因为线程要回收等情况。前者异常需要在执⾏后⾯processWorkerExit⽅法,减少wo } finally {
processWorkerExit(w, completedAbruptly);//出现异常,或者当前线程池被关闭都会⾛到这个⽅法
//processWorkerExit主要是⽤于worker退出后做资源释放及相应是否需要新增worker的逻辑。
}
}
processWorkerExit⽅法逻辑:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//移除这个worker,之后这个对象会被垃圾回收,线程也会被回收
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += wpletedTasks;
} finally {
mainLock.unlock();
}
//尝试将线程池状态设置成terminate,能够设置的条件是:当前线程池状态是shutdown并且任务队列⽆任务或stop状态//如果当前可设置成terminate状态,并且还有worker线程存在,则中断它
//然后设置线程池状态为TIDYING:到达这个状态则说明当前线程数为0了,并且整个线程池已⽆任务。
//最后加锁执⾏terminate⽅法。这⾥是空⽅法,⼦类可以实现特列逻辑。
tryTerminate();
int c = ();
//下⾯的代码主要是看是否需要创建新的worker⽤于替换这个释放掉的worker
//两种情况会需要创建新的worker。
//1.当前被释放的worker由于异常代码
//2.当前是正常退出(由于之前线程超过核⼼线程数或者由于超过最⼤线程数,或者是检测到了shutdown信号)
//然后发现这时发现还有任务待执⾏,则创建⼀个worker
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
//当前线程池未关闭。处于running或shutdown状态。并且是正常因为
java线程池创建的四种int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
//worker线程已经超过核⼼线程数就不需要再创建worker了
return; // replacement not needed
}
addWorker(null, false);//创建worker,底层还是会根据线程池及是否有任务等情况看是否需要创建
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论