java线程池源码详解
为什么要使⽤线程池,有哪些好处?
在开发过程中,如果并发的请求数量⾮常多,但每个线程执⾏的时间很短,这样就会频繁的创建和销毁 线程,如此⼀来会⼤⼤降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要⽐处理实际的⽤户请求的时间和资源更多。 为了解决线程在创建和销毁上所花费的时间,线程池的使⽤是必须的。
什么时候使⽤线程池?
单个任务处理时间⽐较短
需要处理的任务数量很⼤
线程池优势
重⽤存在的线程,减少线程创建,消亡的开销,提⾼性能
提⾼响应速度。当任务到达时,任务可以不需要的等到线程创建就能⽴即执⾏。
提⾼线程的可管理性。线程是稀缺资源,如果⽆限制的创建,不仅会消耗系统资
源,还会降低系统的稳定性,使⽤线程池可以进⾏统⼀的分配,调优和监控。
Executor框架
Executor接⼝是线程池框架中最基础的部分,定义了⼀个⽤于执⾏Runnable的execute⽅ 法。
如图所⽰,线程池中最主要的接⼝是ExecutorService,最主要的⽅式是:
execute(Runnable command):履⾏Ruannable类型的任务
submit(task):可⽤来提交Callable或Runnable任务,并返回代表此任务的Future 对象
shutdown():在完成已提交的任务后封闭办事,不再接管新任务
shutdownNow():停⽌所有正在履⾏的任务并封闭办事
isTerminated():测试是否所有任务都履⾏完毕了
isShutdown():测试是否该ExecutorService已被关闭
ThreadPoolExecutor
ThreadPoolExecutor重点属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
ctl 是对线程池的运⾏状态和线程池中有效线程的数量进⾏控制的⼀个字段,它包含两部分的信息: 线程池的运⾏状态(runState) 和线程池内有效线程的数量 ( workerCount),这⾥可以看到,使⽤了Integer类型来保存,⾼3位保存
runState,低29位保存 workerunt。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量
表⽰workerCount的上限值,⼤约是5亿。
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 获取运⾏状态;
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取活动线程数;
private static int workerCountOf(int c) { return c & CAPACITY; }
// 获取运⾏状态和活动线程数的值。
private static int ctlOf(int rs, int wc) { return rs | wc; }
线程池的5种状态说明
private static final int RUNNING = -1 << COUNT_BITS; //⾼3位为111
private static final int SHUTDOWN = 0 << COUNT_BITS; //⾼3位为000
private static final int STOP = 1 << COUNT_BITS; //⾼3位为001
private static final int TIDYING = 2 << COUNT_BITS; //⾼3位为010
private static final int TERMINATED = 3 << COUNT_BITS; //⾼3位为011
1、RUNNING
(1) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进⾏ 处理。 (02)
状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被⼀旦被创建,就处于RUNNING状态,并且线程池中的任务数为0!
2、 SHUTDOWN
(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。
(2) 状态切换:调⽤线程池的shutdown()接⼝时,线程池由RUNNING -> SHUTDOWN。
3、STOP
(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中 断正在处理的任务。
(2) 状态切换:调⽤线程池的shutdownNow()接⼝时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4、TIDYING
(1) 状态说明:当所有的任务已终⽌,ctl记录的任务数量为0,线程池会变为TIDYING 状态。当线程池
变为TIDYING状态时,会执⾏钩⼦函数terminated()。terminated()在 ThreadPoolExecutor类中是空的,若⽤户想在线程池变为TIDYING时,进⾏相应的处理; 可以通过重载terminated()函数来实现。
(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执⾏的任务也 为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执⾏的 任务为空时,就会由STOP -> TIDYING。
5、 TERMINATED
(1) 状态说明:线程池彻底终⽌,就变成TERMINATED状态。
(2) 状态切换:线程池处在TIDYING状态时,执⾏完terminated()之后,就会由 TIDYING - > TERMINATED。 进⼊TERMINATED的条件如下:
线程池不是RUNNING状态;
线程池状态不是TIDYING状态或TERMINATED状态;
如果线程池状态是SHUTDOWN并且workerQueue为空;
workerCount为0;
设置TIDYING状态成功
ThreadPoolExecutor与ScheduledThreadPoolExecutor
在java中线程池的具体实现类有()ThreadPoolExecutor) 默认线程池 和(ScheduledThreadPoolExecutor) 定时线程池
线程池的构建⽅法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = SecurityManager() == null ?
null :
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = Nanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
核⼼参数
**corePoolSize:**核⼼线程数,当提交⼀个任务时,线程池创建⼀个新线程执⾏任务,直到当 前线程数等corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执⾏;如果执⾏了线程池的prestartAllCoreThreads()⽅法,线程池会提前创建并启动所有核⼼线程
maximumPoolSize: 最⼤线程数,线程池中允许的最⼤线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线 程执⾏任务,前提是当前线程数⼩于maximumPoolSize;
keepAliveTime + 时间单位: 空闲线程的存活时间,线程池维护线程所允许的空闲时间。当线程池中
的线程数量⼤于corePoolSize的时候,如果这时没有新的任务提交,核⼼线程外的线程不会⽴即销毁,⽽是会等待,直到等待 的时间超过了keepAliveTime;
workQueue: 存放任务的队列,且任务必须实现Runable接⼝,JDK中的4种实现⽅式如下:
ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞
吐量通常要⾼于ArrayBlockingQuene;
SynchronousQuene:⼀个不存储元素的阻塞队列,每个插⼊操作必须等到
另⼀个线程调⽤移除操作,否则插⼊操作⼀直处于阻塞状态,吞吐量通常要⾼于 LinkedBlockingQuene;
priorityBlockingQuene:具有优先级的⽆界阻塞队列;
**threadFactory:**线程⼯⼚, ⽤于创建新的线程,默认使⽤ Executors.defaultThreadFactory() 来创建线程,线程初始化名称和优先级(5)。
handler:拒绝策略处理,线程池的饱和策略,当阻塞队列满了,且没有空闲的⼯作线程,如果继续提交任务,必须采取⼀种策略处理该任务,线程池提供了4种策略:
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:⽤调⽤者所在的线程来执⾏任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执⾏当前任务;
DiscardPolicy:直接丢弃任务;
四种队列详解
ArrayBlockingQueue
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime().availableProcessors() * 2, 0, TimeUnit.MILLISECON DS, new ArrayBlockingQueue<>(32), r -> new Thread(r, "ThreadTest"));
ArrayBlockingQueue是⼀个有界缓存等待队列,可以指定缓存队列的⼤⼩,当正在执⾏的线程数等于c
orePoolSize时,多余的元素缓存在ArrayBlockingQueue队列中等待有空闲的线程时继续执⾏,当ArrayBlockingQueue已满时,加⼊ArrayBlockingQueue失败,会开启新的线程去执⾏,当线程数已经达到最⼤的maximumPoolSizes时,再有新的元素尝试加⼊ArrayBlockingQueue时会报错。
LinkedBlockingQueue
private static ExecutorService cachedThreadPool = new ThreadPoolExecutor(4, Runtime().availableProcessors() * 2, 0, TimeUnit.MILLISECON DS, new LinkedBlockingQueue<>(), r -> new Thread(r, "ThreadTest"));
最常⽤的队列,在Executors构建线程池的时候主要的队列模型,效率⽐ArrayBlockingQueue⾼。
LinkedBlockingQueue是⼀个⽆界缓存等待队列。当前执⾏的线程数量达到corePoolSize的数量时,剩余的元素会在阻塞队列⾥等待。(所以在使⽤此阻塞队列时maximumPoolSizes就相当于⽆效了),每个线程完全独⽴于其他线程。⽣产者和消费者使⽤独⽴的锁来控制数据的同步,即在⾼并发的情况下可以并⾏操作队列中的数据。
PS:在使⽤的时候必须设置队列的⼤⼩,如果不设置,默认的⼤⼩会是Integer.MAX_VALUE,这也是为什么阿⾥⼿册中不建议使⽤Executors构建线程池的主要原因。
SynchronousQueue
SynchronousQueue没有容量,是⽆缓冲等待队列,是⼀个不存储元素的阻塞队列,会直接将任务交给消费者,必须等队列中的添加元素被消费后才能继续添加新的元素。
这种队列的特点是不缓存数据,⽽是缓存线程,线程分为⽣产者线程和消费者线程,⼀个⽣产者线程和⼀个消费者线程是互补的,当⼀个⽣产者线程遇到⼀个消费者线程的时候就会直接进⾏数据交换,所以这种队列的技术点⽐较⾼,理解起来难度较⼤。⼀个线程只能缓存⼀个数据,当⼀个线程插⼊数据之后就会被阻塞,直到另外⼀个线程消费了其中的数据。
使⽤SynchronousQueue阻塞队列⼀般要考虑具体的业务逻辑,设置线程池的⼤⼩,避免线程拒绝执⾏操作,但是线程池的⼤⼩也不可以⽆线设置。
priorityBlockingQuene
priorityBlockingQueue是⼀个⽆界队列,它没有限制,在内存允许的情况下可以⽆限添加元素;它⼜是具有优先级的队列,是通过构造函数传⼊的对象来判断,传⼊的对象必须实现comparable接⼝。
DelayedWorkQueue
DelayedWorkQueue优先队列
该队列是定制的优先级队列,只能⽤来存储RunnableScheduledFutures任务。堆是实现优先级队列的最佳选择,⽽该队列正好是基于堆数据结构的实现。
任务提交
1、public void execute() //提交任务⽆返回值
2、public Future<?> submit() //任务执⾏完成后有返回值
线程池监控常⽤函数
1 public long getTaskCount() //线程池已执⾏与未执⾏的任务总数
2 public long getCompletedTaskCount() //已完成的任务数
3 public int getPoolSize() //线程池当前的线程数
4 public int getActiveCount() //线程池中正在执⾏任务的线程数量
线程池执⾏原理
源码分析
execute⽅法
execute⽅法在线程池中相对重要,execute⽅法的主要功能是如何添加任务到队列和worker中,其中包括核⼼线程与⾮核⼼线程的创建条件,任务队列添加任务的条件。源码分析如下:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// clt记录着runState和workerCount
int c = ();
//判断当前⼯作活动线程数是否⼩于核⼼线程数,如果⼩于则新建线程放⼊线程池
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ();
}
//如果⼤于则:判断当前线程池是运⾏状态,添加到队列成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ();
//⼆次判断当前线程池的运⾏状态,如果不是运⾏状态,则移除刚刚添加的任务
if (! isRunning(recheck) && remove(command))
reject(command);
//获取线程池中的有效线程数,如果数量是0,所有核⼼线程都在⼯作,则执⾏addWorker⽅法
//第⼀个参数为null,表⽰在线程池中创建⼀个线程,但不去启动;
// 第⼆个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
// 如果判断workerCount⼤于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执⾏
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果队列满了,则开启⾮核⼼线程,添加任务
else if (!addWorker(command, false))
reject(command);
}
java线程池创建的四种execute⽅法的执⾏流程:
addWorker⽅法
addWorker中需要注意的是Worker的创建以及Worker对应线程的启动,在线程启动的时候如何对应worker中run⽅法。addWorker()源码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论