Java并发编程:Java线程池核⼼ThreadPoolExecutor的使⽤
和原理分析
引出线程池
线程是并发编程的基础,前⾯的⽂章⾥,我们的实例基本都是基于线程开发作为实例,并且都是使⽤的时候就创建⼀个线程。这种⽅式⽐较简单,但是存在⼀个问题,那就是线程的数量问题。
假设有⼀个系统⽐较复杂,需要的线程数很多,如果都是采⽤这种⽅式来创建线程的话,那么就会极⼤的消耗系统资源。⾸先是因为线程本⾝的创建和销毁需要时间,如果每个⼩任务都创建⼀个线程,那么就会⼤⼤降低系统的效率。其次是线程本⾝也是占⽤内存空间的,⼤量的线程运⾏会抢占内存资源,处理不当很可能会内存溢出,这显然不是我们想看到的。
那么有什么办法解决呢?有⼀个好的思路就是对线程进⾏复⽤,因为所有的线程并不都是同⼀时间⼀起运⾏的,有些线程在某个时刻可能是空闲状态,如果这部分空闲线程能有效利⽤起来,那么就能让线程的运⾏被充分的利⽤,这样就不需要创建那么多的线程了。我们可以把特定数量的线程放在⼀个容器⾥,需要使⽤线程时,从容器⾥拿出空闲线程使⽤,线程⼯作完后不急着关闭,⽽是退回到线程池等待使⽤。这样的容器⼀般被称为线程池。⽤线程池来管理线程是⾮常有效的⽅法,⽤⼀张图⽚可以简单的展⽰出线程池的管理流程:
Executor框架
Java中也有⼀套框架来控制管理线程,那就是Executor框架。Executor框架是JDK1.5之后才引⼊的,位于urrent 包下,可以通过该框架来控制线程的启动、执⾏和关闭,从⽽简化并发编程的操作,这是它的核⼼成员类图:
Executor:最上层的接⼝,定义了⼀个基本⽅法execute,接受⼀个Runnable参数,⽤来替代通常创建或启动线程的⽅法。
ExecutorService:继承⾃Executor接⼝,提供了处理多线程的⽅法。
ScheduledExecutorService:定时调度接⼝,继承⾃ExecutorService。
AbstractExecutorService:执⾏框架的抽象类。
ThreadPoolExecutor:线程池中最核⼼的⼀个类,提供了线程池操作的基本⽅法。
Executors:线程池⼯⼚类,可⽤于创建⼀系列有特定功能的线程池。
ThreadPoolExecutor详解
以上Executor框架中的基本成员,其中最核⼼的的成员⽆疑就是ThreadPoolExecutor,想了解Java中线程池的运⾏机制,就必须先了解这个类,⽽最好的了解⽅式⽆疑就是看源码。
构造函数
打开ThreadPoolExecutor的源码,发现类中提供了四个构造⽅法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = Nanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
可以看出,ThreadPoolExecutor的构造函数中的参数还是⽐较多的,并且最核⼼的是第四个构造函数,其中完成了底层的初始化⼯作。
下⾯解释⼀下构造函数参数的含义:
corePoolSize:线程池的基本⼤⼩。当提交⼀个任务到线程池后,线程池会创建⼀个线程执⾏任务,重复这种操作,直到线程池中的数⽬达到corePoolSize后不再创建新线程,⽽是把任务放到缓存队列中。
maximumPoolSize:线程池允许创建的最⼤线程数。
workQueue:阻塞队列,⽤于存储等待执⾏的任务,并且只能存储调⽤execute ⽅法提交的任务。常⽤的有三种队
列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。
keepAliveTime:线程池中线程的最⼤空闲时间,这种情况⼀般是线程数⽬⼤于任务的数量导致。
unit:keepAliveTime的时间单位,TimeUnit是⼀个枚举类型,位于urrent包下。
threadFactory:线程⼯⼚,⽤于创建线程。
handler:拒绝策略,当任务太多来不及处理时所采⽤的处理策略。
重要的变量
看完了构造函数,我们来看下ThreadPoolExecutor类中⼏个重要的成员变量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY  = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN  =  0 << COUNT_BITS;
private static final int STOP      =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c)    { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
ctl:控制线程运⾏状态的⼀个字段。同时,根据下⾯的⼏个⽅法runStateOf,workerCountOf,ctlOf可以看出,该字段还包含了两部分的信息:线程池的运⾏状态 (runState) 和线程池内有效线程的数量 (workerCount),并且使⽤的是Integar类型,⾼3位保存runState,低29位保存workerCount。
COUNT_BITS:值为29的常量,在字段CAPACITY被引⽤计算。
CAPACITY:表⽰有效线程数量(workerCount)的上限,⼤⼩为 (1<<29) - 1。
下⾯5个变量表⽰的是线程的运⾏状态,分别是:
RUNNING :接受新提交的任务,并且能处理阻塞队列中的任务;
SHUTDOWN:不接受新的任务,但会执⾏队列中的任务。
STOP:不接受新任务,也不处理队列中的任务,同时中断正在处理任务的线程。
TIDYING:如果所有的任务都已终⽌了,workerCount (有效线程数) 为0,线程池进⼊该状态后会调⽤ terminated() ⽅法进⼊TERMINATED 状态。
TERMINATED:terminated( ) ⽅法执⾏完毕。
⽤⼀个状态转换图表⽰⼤概如下 (图⽚来源于):
构造函数和基本参数都了解后,接下来就是对类中重要⽅法的研究了。
线程池执⾏流程
execute⽅法
ThreadPoolExecutor类的核⼼调度⽅法是execute(),通过调⽤这个⽅法可以向线程池提交⼀个任务,交由线程池去执⾏。⽽
ThreadPoolExecutor的⼯作逻辑也可以藉由这个⽅法来⼀步步理清。这是⽅法的源码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取ctl的值,前⾯说了,该值记录着runState和workerCount
int c = ();
/*
* 调⽤workerCountOf得到当前活动的线程数;
* 当前活动线程数⼩于corePoolSize,新建⼀个线程放⼊线程池中;
* addWorker(): 把任务添加到该线程中。
*/
if (workerCountOf(c) < corePoolSize) {
java线程池创建的四种
if (addWorker(command, true))
return;
//如果上⾯的添加线程操作失败,重新获取ctl值
c = ();
}
//如果当前线程池是运⾏状态,并且往⼯作队列中添加该任务
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ();
/
*
* 如果当前线程不是运⾏状态,把任务从队列中移除
* 调⽤reject(内部调⽤handler)拒绝接受任务
*/
if (! isRunning(recheck) && remove(command))
reject(command);
//获取线程池中的有效线程数,如果为0,则执⾏addWorker创建⼀个新线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* 如果执⾏到这⾥,有两种情况:
* 1. 线程池已经不是RUNNING状态;
* 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
* 这时,再次调⽤addWorker⽅法,但第⼆个参数传⼊为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
* 如果失败则拒绝该任务
*/
else if (!addWorker(command, false))
reject(command);
}
简单概括⼀下代码的逻辑,⼤概是这样:
1、判断当前运⾏中的线程数是否⼩于corePoolSize,是的话则调⽤addWorker创建线程执⾏任务。
2、不满⾜1的条件,就把任务放⼊⼯作队列workQueue中。
3、如果任务成功加⼊workQueue,判断线程池是否是运⾏状态,不是的话先把任务移出⼯作队列,并调⽤reject⽅法,使⽤拒绝策略拒绝该任务。线程如果是⾮运⾏中,调⽤addWorker创建⼀个新线程。
4、如果放⼊workQueue失败 (队列已满),则调⽤addWorker创建线程执⾏任务,如果这时创建线程失败 (addWorker传进去的第⼆个参数值是false,说明这种情况是当前线程数不⼩于maximumPoolSize),就会调⽤reject(内部调⽤handler)拒绝接受任务。
整个执⾏流程⽤⼀张图⽚表⽰⼤致如下:

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。