java线程池与五种常⽤线程池策略使⽤与解析
java线程池与五种常⽤线程池策略使⽤与解析
⼀.线程池
关于为什么要使⽤线程池久不赘述了,⾸先看⼀下java中作为线程池Executor底层实现类的ThredPoolExecutor的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
其中各个参数含义如下:
corePoolSize- 池中所保存的线程数,包括空闲线程。需要注意的是在初创建线程池时线程不会⽴即启动,直到有任务提交才开始启动线程并逐渐时线程数⽬达到corePoolSize。若想⼀开始就创建所有核⼼线程需调⽤prestartAllCoreThreads⽅法。
maximumPoolSize-池中允许的最⼤线程数。需要注意的是当核⼼线程满且阻塞队列也满时才会判断当前线程数是否⼩于最⼤线程数,并决定是否创建新线程。
keepAliveTime - 当线程数⼤于核⼼时,多于的空闲线程最多存活时间
unit - keepAliveTime 参数的时间单位。
workQueue - 当线程数⽬超过核⼼线程数时⽤于保存任务的队列。主要有3种类型的BlockingQueue可供选择:⽆界队列,有界队列和同步移交。将在下⽂中详细阐述。从参数中可以看到,此队列仅保存实现Runnable接⼝的任务。
threadFactory - 执⾏程序创建新线程时使⽤的⼯⼚。
handler - 阻塞队列已满且线程数达到最⼤值时所采取的饱和策略。java默认提供了4种饱和策略的实现⽅式:中⽌、抛弃、抛弃最旧的、调⽤者运⾏。将在下⽂中详细阐述。
⼆.可选择的阻塞队列BlockingQueue详解
⾸先看⼀下新任务进⼊时线程池的执⾏策略:
如果运⾏的线程少于corePoolSize,则 Executor始终⾸选添加新的线程,⽽不进⾏排队。(如果当前运⾏的线程⼩于corePoolSize,则任务根本不会存⼊queue中,⽽是直接运⾏)
如果运⾏的线程⼤于等于 corePoolSize,则 Executor始终⾸选将请求加⼊队列,⽽不添加新的线程。
如果⽆法将请求加⼊队列,则创建新的线程,除⾮创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
主要有3种类型的BlockingQueue:
2.1 ⽆界队列
队列⼤⼩⽆限制,常⽤的为⽆界的LinkedBlockingQueue,使⽤该队列做为阻塞队列时要尤其当⼼,当任务耗时较长时可能会导致⼤量新任务在队列中堆积最终导致OOM。最近⼯作中就遇到因为采⽤LinkedBlockingQueue作为阻塞队列,部分任务耗时80s+且不停有新任务进来,导致cpu和内存飙升服务器挂掉。
2.2 有界队列
常⽤的有两类,⼀类是遵循FIFO原则的队列如ArrayBlockingQueue与有界的LinkedBlockingQueue,另⼀类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。
使⽤有界队列时队列⼤⼩需和线程池⼤⼩互相配合,线程池较⼩有界队列较⼤时可减少内存消耗,降低cpu使⽤率和上下⽂切换,但是可能会限制系统吞吐量。
2.3 同步移交
如果不希望任务在队列中等待⽽是希望将任务直接移交给⼯作线程,可使⽤SynchronousQueue作为等待队列。SynchronousQueue不是⼀个真正的队列,⽽是⼀种线程之间移交的机制。要将⼀个元素放⼊SynchronousQueue中,必须有另⼀个线程正在等待接收这个元素。只有在使⽤⽆界线程池或者有饱和策略时才建议使⽤该队列。
2.4 ⼏种BlockingQueue的具体实现原理
关于上述⼏种BlockingQueue的具体实现原理与分析将在下篇博⽂中详细阐述。
三.可选择的饱和策略RejectedExecutionHandler详解
JDK主要提供了4种饱和策略供选择。4种策略都做为静态内部类在ThreadPoolExcutor中进⾏实现。
3.1 AbortPolicy中⽌策略
该策略是默认饱和策略。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
}
使⽤该策略时在饱和时会抛出RejectedExecutionException(继承⾃RuntimeException),调⽤者可捕获该异常⾃⾏处理。
3.2 DiscardPolicy抛弃策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
如代码所⽰,不做任何处理直接抛弃任务
3.3 DiscardOldestPolicy抛弃旧任务策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
}
}
如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使⽤PriorityBlockingQueue优先级队列,将会导致优先级最⾼的任务被抛弃,因此不建议将该种策略配合优先级队列使⽤。
3.4 CallerRunsPolicy调⽤者运⾏
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
既不抛弃任务也不抛出异常,直接运⾏任务的run⽅法,换⾔之将任务回退给调⽤者来直接运⾏。使⽤该策略时线程池饱和后将由调⽤线程池的主线程⾃⼰来执⾏任务,因此在执⾏任务的这段时间⾥主线程⽆法再提交新任务,从⽽使线程池中⼯作线程有时间将正在处理的任务处理完成。
四.java提供的四种常⽤线程池解析
在JDK帮助⽂档中,有如此⼀段话:
强烈建议程序员使⽤较为⽅便的Executors⼯⼚⽅法wCachedThreadPool()(⽆界线程池,可以进⾏⾃动线程回收)、wFixedThreadPool(int)(固定⼤⼩线程池)wSingleThreadExecutor()(单个后台线程)它们均为⼤多数使⽤场景预定义了设置。
详细介绍⼀下上述四种线程池。
4.1 newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若⽆可回收,则新建线程。
初看该构造函数时我有这样的疑惑:核⼼线程池为0,那按照前⾯所讲的线程池策略新任务来临时⽆法进⼊核⼼线程池,只能进⼊SynchronousQueue中进⾏等待,⽽SynchronousQueue的⼤⼩为1,那岂不是第⼀个任务到达时只能等待在队列中,直到第⼆个任务到达发现⽆法进⼊队列才能创建第⼀个线程?
这个问题的答案在上⾯讲SynchronousQueue时其实已经给出了,要将⼀个元素放⼊SynchronousQueue中,必须有另⼀个线程正在等待接收这个元素。因此即便SynchronousQueue⼀开始为空且⼤⼩为1,第⼀个任务也⽆法放⼊其中,因为没有线程在等待从SynchronousQueue中取⾛元素。因此第⼀个任务到达时便会创建⼀个新线程执⾏该任务。
这⾥引申出⼀个⼩技巧:有时我们可能希望线程池在没有任务的情况下销毁所有的线程,既设置线程池核⼼⼤⼩为0,但⼜不想使⽤SynchronousQueue⽽是想使⽤有界的等待队列。显然,不进⾏任何特殊设置的话这样的⽤法会发⽣奇怪的⾏为:直到等待队列被填满才会有新线程被创建,任务才开始执⾏。这并不是我们希望看到的,此时可通过allowCoreThreadTimeOut使等待队列中的元素出队被调⽤执⾏,详细原理和使⽤将会在后续博客中阐述。
4.2 newFixedThreadPool 创建⼀个定长线程池,可控制线程最⼤并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
看代码⼀⽬了然了,使⽤固定⼤⼩的线程池并使⽤⽆限⼤的队列
4.3 newScheduledThreadPool 创建⼀个定长线程池,⽀持定时及周期性任务执⾏。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
java线程池创建的四种}
在来看看ScheduledThreadPoolExecutor()的构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor的⽗类即ThreadPoolExecutor,因此这⾥各参数含义和上⾯⼀样。值得关⼼的是DelayedWorkQueue 这个阻塞对列,在上⾯没有介绍,它作为静态内部类就在ScheduledThreadPoolExecutor中进⾏了实现。具体分析讲会在后续博客中给出,在这⾥只进⾏简单说明:DelayedWorkQueue是⼀个⽆界队列,它能按⼀定的顺序对⼯作队列中的元素进⾏排列。因此这⾥设置的最⼤线程数 Integer.MAX_VALUE没有任何意义。关于ScheduledThreadPoolExecutor的具体使⽤将会在后续quartz的周期性任务实现原理中进⾏进⼀步分析。
4.4 newSingleThreadExecutor 创建⼀个单线程化的线程池,它只会⽤唯⼀的⼯作线程来执⾏任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执⾏。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
⾸先new了⼀个线程数⽬为1的ScheduledThreadPoolExecutor,再把该对象传⼊DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的实现代码:
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
在看看它的⽗类
DelegatedExecutorService(ExecutorService executor) { e = executor; }
其实就是使⽤装饰模式增强了ScheduledExecutorService(1)的功能,不仅确保只有⼀个线程顺序执⾏任务,也保证线程意外终⽌后会重新创建⼀个线程继续执⾏任务。具体实现原理会在后续博客中讲解。
4.5 newWorkStealingPool创建⼀个拥有多个任务队列(以便减少连接数)的线程池。
这是jdk1.8中新增加的⼀种线程池实现,先看⼀下它的⽆参实现
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
返回的ForkJoinPool从jdk1.7开始引进,个⼈感觉类似于mapreduce的思想。这个线程池较为特殊,将
在后续博客中给出详细的使⽤说明和原理。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论