Java—线程池ThreadPoolExecutor案例详解,⾼薪必备
引导
要求:线程资源必须通过线程池提供,不允许在应⽤⾃⾏显式创建线程;说明:使⽤线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不⾜的问题。如果不使⽤线程池,有可能造成系统创建⼤量同类线程⽽导致消耗内存或者“过度切换”的问题。
特别要注意:光理论是不够的,记住:Java架构项⽬经验永远是核⼼,如果你没有最新JAVA架构实战教程及⼤⼚30k+⾯试宝
典,可以去⼩编的Java架构学习.裙:七吧伞吧零⽽⾐零伞(数字的谐⾳)转换下可以到了,⾥⾯很多新JAVA架构项⽬教
程,还可以跟⽼司机交流讨教!
线程池介绍
线程池概述
线程池,顾名思义是⼀个放着线程的池⼦,这个池⼦的线程主要是⽤来执⾏任务的。当⽤户提交任务时,线程池会创建线程去执⾏任务,若任务超过了核⼼线程数的时候,会在⼀个任务队列⾥进⾏排队等待,这个详细流程,我们会后⾯细讲。 任务,通常是⼀些抽象的且离散的⼯作单元,我们会把应⽤程序的⼯作分解到多个任务中去执⾏。⼀般我们需要使⽤多线程执⾏任务的时候,这些任务最好都是相互独⽴的,这样有⼀定的任务边界供程序把控。 多线程,当使⽤多线程的时候,任务处理过程就可以从主线程中剥离出来,任务可以并⾏处理,同时处理多个请求。当然了,任务处理代码必须是线程安全的。
为何要使⽤线程池?
1. 降低开销:在创建和销毁线程的时候会产⽣很⼤的系统开销,频繁创建/销毁意味着CPU资源的频繁切换和占⽤,线程是属于稀缺资
源,不可以频繁的创建。假设创建线程的时长记为t1,线程执⾏任务的时长记为t2,销毁线程的时长记为t3,如果我们执⾏任务t2<t1+t3,那么这样的开销是不划算的,不使⽤线程池去避免创建和销毁的开销,将是极⼤的资源浪费。
2. 易复⽤和管理:将线程都放在⼀个池⼦⾥,便于统⼀管理(可以延时执⾏,可以统⼀命名线程名称等),同时,也便于任务进⾏复
⽤。
3. 解耦:将线程的创建和销毁与执⾏任务完全分离出来,这样⽅便于我们进⾏维护,也让我们更专注于业务开发。
线程池的优势
1. 提⾼资源的利⽤性:通过池化可以重复利⽤已创建的线程,空闲线程可以处理新提交的任务,从⽽降低了创建和销毁线程的资源开
销。
2. 提⾼线程的管理性:在⼀个线程池中管理执⾏任务的线程,对线程可以进⾏统⼀的创建、销毁以及监控等,对线程数做控制,防⽌线
程的⽆限制创建,避免线程数量的急剧上升⽽导致CPU过度调度等问题,从⽽更合理的分配和使⽤内核资源。
3. 提⾼程序的响应性:提交任务后,有空闲线程可以直接去执⾏任务,⽆需新建。
4. 提⾼系统的可扩展性:利⽤线程池可以更好的扩展⼀些功能,⽐如定时线程池可以实现系统的定时任务。
线程池原理
线程池的参数类型
⼀共有7个:corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler,(5+2,前5个重要)
int corePoolSize:该线程池中核⼼线程数最⼤值
这边我们区分两个概念:
核⼼线程:线程池新建线程的时候,当前线程总数< corePoolSize,新建的线程即为核⼼线程。
⾮核⼼线程:线程池新建线程的时候,当前线程总数< corePoolSize,新建的线程即为核⼼线程。
核⼼线程默认情况下会⼀直存活在线程池中,即使这个核⼼线程不⼯作(空闲状态),除⾮ThreadPoolExecutor 的 allowCoreThreadTimeOut这个属性为true,那么核⼼线程如果空闲状态下,超过⼀定时间后就被销毁。
int maximumPoolSize:线程总数最⼤值
线程总数 = 核⼼线程数 + ⾮核⼼线程数
long keepAliveTime:⾮核⼼线程空闲超时时间
keepAliveTime即为空闲线程允许的最⼤的存活时间。如果⼀个⾮核⼼线程空闲状态的时长超过keepAliveTime了,就会被销毁掉。注意:如果设置allowCoreThreadTimeOut = true,就变成核⼼线程超时销毁了。
TimeUnit unit:是keepAliveTime 的单位
TimeUnit 是⼀个枚举类型,列举如下:
单位说明
NANOSECONDS1微毫秒 = 1微秒 / 1000
MICROSECONDS1微秒 = 1毫秒 / 1000
MILLISECONDS1毫秒 = 1秒 /1000
SECONDS秒
MINUTES分
HOURS⼩时
DAYS天
BlockingQueue workQueue:存放任务的阻塞队列
当核⼼线程都在⼯作的时候,新提交的任务就会被添加到这个⼯作阻塞队列中进⾏排队等待;如果阻塞队列也满了,线程池就新建⾮核⼼线程去执⾏任务。workQueue维护的是等待执⾏的Runnable对象。常⽤的 workQueue 类型:(⽆界队列、有界队列、同步移交队列)
1. SynchronousQueue:同步移交队列,适⽤于⾮常⼤的或者⽆界的线程池,可以避免任务排队,SynchronousQueue队列接收到任务后,会直接将
任务从⽣产者移交给⼯作者线程,这种移交机制⾼效。它是⼀种不存储元素的队列,任务不会先放到队列中去等线程来取,⽽是直接移交给执⾏的线程。只有当线程池是⽆界的或可以拒绝任务的时候,SynchronousQueue队列的使⽤才有意义,maximumPoolSize ⼀般指定成 Integer.MAX_VALUE,即
⽆限⼤。要将⼀个元素放⼊SynchronousQueue,就需要有另⼀个线程在等待接收这个元素。若没有线程在等待,并且线程池的当前线程数⼩于最⼤值,则ThreadPoolExecutor就会新建⼀个线程;否则,根据饱和策略,拒绝任
务。newCachedThreadPool默认使⽤的就是这种同步移交队列。吞吐量⾼于LinkedBlockingQueue。
2. LinkedBlockingQueue:基于链表结构的阻塞队列,FIFO原则排序。当任务提交过来,若当前线程数⼩于corePoolSize核⼼线程数,则线
程池新建核⼼线程去执⾏任务;若当前线程数等于corePoolSize核⼼线程数,则进⼊⼯作队列进⾏等待。LinkedBlockingQueue队列没有最⼤值限制,只要任务数超过核⼼线程数,都会被添加到队列中,这就会导致总线程数永远不会超过 corePoolSize,所以
maximumPoolSize 是⼀个⽆效设定。newFixedThreadPool和newSingleThreadPool默认是使⽤的是⽆界LinkedBlockingQueue队列。吞吐量⾼于ArrayBlockingQueue。
3. ArrayBlockingQueue:基于数组结构的有界阻塞队列,可以设置队列上限值,FIFO原则排序。当任务提交时,若当前线程⼩于
corePoolSize核⼼线程数,则新建核⼼线程执⾏任务;若当先线程数等于corePoolSize核⼼线程数,
则进⼊队列排队等候;若队列的任务数也排满了,则新建⾮核⼼线程执⾏任务;若队列满了且总线程数达到了maximumPoolSize最⼤线程数,则根据饱和策略进⾏任务的拒绝。
4. DelayQueue:延迟队列,队列内的元素必须实现 Delayed 接⼝。当任务提交时,⼊队列后只有达到指定的延时时间,才会执⾏任务
5. PriorityBlockingQueue:优先级阻塞队列,根据优先级执⾏任务,优先级是通过⾃然排序或者是Comparator定义实现。
注意:只有当任务相互独⽴没有任何依赖的时候,线程池或⼯作队列设置有界是合理的;若任务之间存在依赖性,需要使⽤⽆界的线程池,如newCachedThreadPool,否则有可能会导致死锁问题。
ThreadFactory threadFactory
创建线程的⽅式,这是⼀个接⼝,你 new 他的时候需要实现他的 Thread newThread(Runnable r) ⽅法,⼀般⽤不上,RejectedExecutionHandler handler:饱和策略
抛出异常专⽤,当队列和最⼤线程池都满了之后的饱和策略。
线程池⼯作流程
⼀般流程即为:创建worker线程;添加任务⼊workQueue队列;worker线程执⾏任务。
当⼀个任务被添加进线程池时:
1. 当前线程数量未达到 corePoolSize,则新建⼀个线程(核⼼线程)执⾏任务
2. 当前线程数量达到了 corePoolSize,则将任务移⼊阻塞队列等待,让空闲线程处理;
3. 当阻塞队列已满,新建线程(⾮核⼼线程)执⾏任务
4. 当阻塞队列已满,总线程数⼜达到了 maximumPoolSize,就会按照拒绝策略处理⽆法执⾏的任务,⽐如RejectedExecutionHandler抛出异
常。
这边,为了⼤家能够更好的去理解这块的流程,我们举⼀个例⼦。⽣活中我们经常会去打⼀些公司的或者是⼀些特定机构的,⽽那个公司或者机构的客服中⼼就是⼀个线程池,正式员⼯的客服⼩就好⽐是核⼼线程,⽐如有6个客服⼩。 5. 当⽤户的电话打进到公司的客服中⼼的时候(提交任务); 6. 客服中⼼会调度客服⼩去接听电话(创建线程执⾏任务),如果接听的电话超过了6个,6个客服⼩都在接听的⼯作状态了(核⼼线程池满了),这时客服中⼼会有⼀个
电话接听等待通道(进⼊任务队列等待),就是我们经常听到的“您的通话在排队,前⾯排队n⼈。” 7. 当然,这个电话接听等待通道也是有上限的,当超过这个上限的时候(任务队列满了),客服中⼼就会⽴即安排外协员⼯(⾮核⼼线程),也就是⾮正式员⼯去接听额外的电话(任务队列满了,正式和⾮正式员⼯数量>总任务数,线程池创建⾮核⼼线程去执⾏任务)。
8. 当⽤户电话数激增,客服中⼼控制台发现这个时候正式员⼯和外协员⼯的总和已经满⾜不了这些⽤户电话接⼊了(总线程池满),就开始根据⼀些接听规则去拒绝这些电话(按照拒绝策略处理⽆法执⾏的任务)
线程池状态
RUNNING:运⾏状态,指可以接受任务并执⾏队列⾥的任务。
SHUTDOWN:调⽤了 shutdown() ⽅法,不再接受新任务,但队列⾥的任务会执⾏完毕。
STOP:指调⽤了 shutdownNow() ⽅法,不再接受新任务,所有任务都变成STOP状态,不管是否正在执⾏。该操作会抛弃阻塞队列⾥的所有任务并中断所有正在执⾏任务。
java线程池创建的四种TIDYING:所有任务都执⾏完毕,程序调⽤ shutdown()/shutdownNow() ⽅法都会将线程更新为此状态,若调⽤shutdown(),则等执⾏任务全部结束,队列即为空,变成TIDYING状态;调⽤shutdownNow()⽅法后,队列任务清空且正在执⾏的任务中断后,更新为TIDYING状态。
TERMINATED:终⽌状态,当线程执⾏terminated()后会更新为这个状态。
线程池源码
线程池核⼼接⼝
ThreadPoolExecutor,在urrent下。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize, //核⼼线程数
int maximumPoolSize, //最⼤线程数
long keepAliveTime, //空闲线程存活时间
TimeUnit unit, //存活时间单位
BlockingQueue<Runnable> workQueue, //任务的阻塞队列
ThreadFactory threadFactory, //新线程的产⽣⽅式
RejectedExecutionHandler handler //拒绝策略) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = SecurityManager() == null ?
null :
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = Nanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
复制代码
ThreadPoolExecutor 继承 AbstractExecutorService;AbstractExecutorService 实现 ExecutorService, ExecutorService 继承 Executor
public class ThreadPoolExecutor extends AbstractExecutorService {}
public abstract class AbstractExecutorService implements ExecutorService {}
public interface ExecutorService extends Executor {}
复制代码
线程池构造⽅法
1)5参数构造器
// 5参数构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
复制代码
2)6参数构造器-1
// 6参数构造器-1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
复制代码
3)6参数构造器-2
// 6参数构造器-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
复制代码
4)7参数构造器
// 7参数构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
复制代码
四种线程池
常规⽤法
//创建固定数⽬线程的线程池
//创建⼀个⽆限线程的线程池,⽆需等待队列,任务提交即执⾏
//创建有且仅有⼀个线程的线程池
复制代码
newCachedThreadPool():可缓存线程池
介绍
newCachedThreadPool将创建⼀个可缓存的线程,如果当前线程数超过处理任务时,回收空闲线程;当需求增加时,可以添加新线程去处
理任务。
1. 线程数⽆限制,corePoolSize数值为0, maximumPoolSize 的数值都是为 Integer.MAX_VALUE。
2. 若线程未回收,任务到达时,会复⽤空闲线程;若⽆空闲线程,则新建线程执⾏任务。
3. 因为复⽤性,⼀定程序减少频繁创建/销毁线程,减少系统开销。
4. ⼯作队列可以选⽤SynchronousQueue。
创建⽅法
ExecutorService cachedThreadPool = wCachedThreadPool();
源码
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
复制代码
newFixedThreadPool():定长线程池
介绍
newFixedThreadPool创建⼀个固定长度的线程池,每次提交⼀个任务的时候就会创建⼀个新的线程,直到达到线程池的最⼤数量限制。
1. 定长,可以控制线程最⼤并发数, corePoolSize 和 maximumPoolSize 的数值都是nThreads。
2. 超出的线程会在队列中等待。
3. ⼯作队列可以选⽤LinkedBlockingQueue。
创建⽅法
ExecutorService fixedThreadPool = wFixedThreadPool(int nThreads);
源码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
复制代码
newScheduledThreadPool():定时线程池
介绍
newScheduledThreadPool创建⼀个固定长度的线程池,并且以延迟或者定时的⽅式去执⾏任务。
创建⽅法:
ExecutorService scheduledThreadPool = wScheduledThreadPool(int corePoolSize);
源码
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
复制代码
newSingleThreadExecutor():单线程化的线程池
介绍
newSingleThreadExecutor顾名思义,是⼀个单线程的Executor,只创建⼀个⼯作线程执⾏任务,若这个唯⼀的线程异常故障了,会新建另⼀个线程来替代,newSingleThreadExecutor可以保证任务依照在⼯作队列的排队顺序来串⾏执⾏。
1. 有且仅有⼀个⼯作线程执⾏任务;
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论