ExecutorService线程池详解
1、什么是ExecutorService,为什么要使⽤线程池?
许多服务器应⽤程序都⾯向处理来⾃某些远程来源的⼤量短⼩的任务,每当⼀个请求到达就创建⼀个新线程,然后在新线程中为请求服务,但是频繁创建新线程、销毁新线程、线程切换既花费较多的时间,影响相应速度,⼜消耗⼤量的系统资源,且有时服务器⽆法处理过多请求导致崩溃。⼀种情形:假设⼀个服务器完成⼀项任务所需时间为:T1 创建线程时间,T2 在线程中执⾏任务的时间,T3 销毁线程时间。如果:T1 + T3 远⼤于 T2,则可以采⽤线程池,以提⾼服务器性能。ExecutorService是⼀个线程池,请求到达时,线程已经存在,响应延迟低,多个任务复⽤线程,避免了线程的重复创建和销毁,并且可以规定线程数⽬,请求数⽬超过阈值时强制其等待直到有空闲线程。
当我们有任务需要多线程来完成时,将任务(实现Runnable、callable接⼝、继承Thread类的对象)提交给ExecutorService。
创建⽅式如下:
1public ThreadPoolExecutor(int corePoolSize,
2int maximumPoolSize,
3long keepAliveTime,
4 TimeUnit unit,
5 BlockingQueue<Runnable> workQueue,
6 ThreadFactory threadFactory,
7 RejectedExecutionHandler handler)
corePoolSize : 核⼼线程数,⼀旦创建将不会再释放。如果创建的线程数还没有达到指定的核⼼线程数量,将会继续创建新的核⼼线程,直到达到最⼤核⼼线程数后,核⼼线程数将不在增加;如果没有空闲的核⼼线程,同时⼜未达到最⼤线程数,则将继续创建⾮核⼼线程;如果核⼼线程数等于最⼤线程数,则当核⼼线程都处于激活状态时,任务将被挂起,等待有空闲线程时再执⾏。
maximumPoolSize : 最⼤线程数,允许创建的最⼤线程数量。如果最⼤线程数等于核⼼线程数,则⽆法创建⾮核⼼线程;如果⾮核⼼线程处于空闲时,超过设置的空闲时间,则将被回收,释放占⽤的资源。
keepAliveTime : 也就是当线程空闲时,所允许保存的最⼤时间,超过这个时间,线程将被释放销毁,但只针对于⾮核⼼线程。
unit : 时间单位,TimeUnit.SECONDS等。
workQueue : 任务队列,⽤于保存等待执⾏的任务的阻塞队列。可以选择以下⼏个阻塞队列。
1. ArrayBlockingQueue:是⼀个基于数组结构的有界阻塞队列,必须设置容量。此队列按 FIFO(先进先出)原则对元素进⾏排序。
2. LinkedBlockingQueue:⼀个基于链表结构的阻塞队列,可以设置容量,此队列按FIFO (先进先出)排序元素,吞吐量通常要⾼于
ArrayBlockingQueue。
3. SynchronousQueue:⼀个不存储元素的阻塞队列。每个插⼊offer操作必须等到另⼀个线程调⽤移除poll操作,否则插⼊操作⼀直处于
阻塞状态,吞吐量通常要⾼于LinkedBlockingQueue。
4. PriorityBlockingQueue:⼀个具有优先级的⽆限阻塞队列。
threadFactory : 线程⼯⼚,⽤于创建线程。
handler : 当线程边界和队列容量已经达到最⼤时,⽤于处理阻塞时的程序。
2、ExecutorService的类型
⼀共有四种线程池:
CachedThreadPool可缓存线程池、SingleThreadExecutor单线程池、FixedThreadPool固定线程数线程池、ScheduledThreadPool 固定线程数,⽀持定时和周期性任务线程池。
ThreadPoolExecutor(该类就是线程池类)继承AbstractExecutorService类,该抽象类实现ExecutorService接⼝,Executors是⼀个⼯⼚类,包含很多静态⽅法,包括newCachedThreadPool、newSingleThreadExecutor、newFixedThreadPool等,这些静态⽅法中调⽤了ThreadPoolExecutor的构造函数,并且不同的线程池调⽤构造⽅法时传⼊不同的参数。
2.1 CachedThreadPool可缓存线程池(⽆界线程池)
1public static ExecutorService newCachedThreadPool() {
2return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3 60L, TimeUnit.SECONDS,
4new SynchronousQueue<Runnable>());
5 }
通过它的创建⽅式可以知道,创建的都是⾮核⼼线程,⽽且最⼤线程数为Interge的最⼤值,空闲线程存活时间是1分钟。SynchronousQueue队列,⼀个不存储元素的阻塞队列。每个插⼊操作必须等到另⼀个线程调⽤移除操作。所以,当我们提交第⼀个任务的时候,是加⼊不了队列的,这就满⾜了,⼀个线程池条件“当⽆法加⼊队列的时候,且任务没有达到maxsize时,我们将新开启⼀个线程任务”。即当线程不够⽤的时候会不断创建新线程,如果线程⽆限增长,会导致内存溢出。所以我们的maxsize是big big。时间是60s,当⼀个线程没有任务执⾏会暂时保存60s超时时间,如果没有的新的任务的话,会从cache中remove掉。因此长时间不提交任务的CachedThreadPool不会占⽤系统资源。就是缓冲区为1的⽣产者消费者模式。
2.2 SingleThreadExecutor单线程池
1public static ExecutorService newSingleThreadExecutor() {
2return new FinalizableDelegatedExecutorService
3 (new ThreadPoolExecutor(1, 1,
4 0L, TimeUnit.MILLISECONDS,
5new LinkedBlockingQueue<Runnable>()));
6 }
只⽤⼀个线程来执⾏任务,保证任务按FIFO顺序⼀个个执⾏。
2.3 FixedThreadPool固定线程数线程池
1public static ExecutorService newFixedThreadPool(int nThreads) {
2return new ThreadPoolExecutor(nThreads, nThreads,
3 0L, TimeUnit.MILLISECONDS,
4new LinkedBlockingQueue<Runnable>());
5 }
coresize和maxmumsize相同,超时时间为0,队列⽤的LinkedBlockingQueue⽆界的FIFO队列,如果
队列⾥⾯有线程任务的话就从队列⾥⾯取出线程,然后开启⼀个新的线程开始执⾏。很明显,这个线程池始终只有size的线程在运⾏,⼤⼩固定,难以扩展。
2.4 ScheduledThreadPool 固定线程数,⽀持定时和周期性任务线程池
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
3、创建线程的时机(线程池的⼯作策略)
1. 如果运⾏的线程少于 corePoolSize,则 Executor 始终⾸选添加新的线程,⽽不进⾏排队。(即如果当前运⾏的线程⼩于
corePoolSize,则任务根本不会添加到workQueue中)
2. 如果运⾏的线程等于或多于 corePoolSize,则 Executor 始终⾸选将请求加⼊⼯作队列,⽽不添加新的线程。
3. 如果⽆法将请求加⼊workQueue(但是队列已满),则创建新的线程,除⾮创建此线程超出 maximumPoolSize,如果超过,在这种情
况下,新的任务将被拒绝。
3、代码例⼦
1public class ExecutorServicepool {
2public static void main(String[] args) throws InterruptedException {
3int[] a = new int[1];
4//创建⼀个容量为5的线程池
5 ExecutorService executorService = wFixedThreadPool(5);
6for(int i = 0;i<15;i++){
7//向线程池提交⼀个任务(其实就是通过线程池来启动⼀个线程)
8 ute(new TestRunnable(a));
9 System.out.println("============ "+i);
10 Thread.sleep(millis:1000);
System.out.printlin("主线程休眠了1秒钟")
11 }
12 }
java线程池创建的四种13 }
14
15class TestRunnable extends Thread{
16public int[] count;
17 TestRunnable(int[] a){
19 }
20 @Override
21public void run(){
22try {
23if(Thread.currentThread().getName().equals("pool-1-thread-1"))
24 Thread.sleep(2000);
25 } catch (Exception e) {
26 e.printStackTrace();
27 }
28 System.out.println(Thread.currentThread().getName()+"-线程被调⽤了");
29 System.out.println("count值为:"+(++count[0]));
30 }
31 }
得到的输出结果如下:可知,固定线程数线程池,线程数为5个,提交15个任务给线程池,也就是使⽤5个线程完成对a[0]++的⼯作,5个线程之间是异步的,线程池中线程与主线程也是异步的。
============ 0
============ 1
============ 2
============ 3
============ 4
============ 5
============ 6
============ 7
============ 8
============ 9
============ 10
============ 11
============ 12
============ 13
============ 14
pool-1-thread-3-线程被调⽤了
pool-1-thread-2-线程被调⽤了
pool-1-thread-5-线程被调⽤了
pool-1-thread-4-线程被调⽤了
count值为:2
count值为:3
count值为:1
pool-1-thread-2-线程被调⽤了
count值为:4
count值为:5
pool-1-thread-3-线程被调⽤了
pool-1-thread-2-线程被调⽤了
pool-1-thread-4-线程被调⽤了
count值为:7
count值为:6
pool-1-thread-2-线程被调⽤了
count值为:9
count值为:8
pool-1-thread-4-线程被调⽤了
count值为:10
pool-1-thread-4-线程被调⽤了
count值为:11
pool-1-thread-2-线程被调⽤了
count值为:12
pool-1-thread-5-线程被调⽤了
count值为:13
pool-1-thread-3-线程被调⽤了
count值为:14
主线程休眠了1秒钟
pool-1-thread-1-线程被调⽤了
count值为:15
4、怎么关闭线程池?
执⾏程序时发现,所有线程执⾏完毕后,JVM并未结束运⾏,也就说明线程池没有正常结束。怎样正确关闭线程池呢?
调⽤ Executor 的 shutdown() ⽅法会等待线程都执⾏完毕之后再关闭,但是如果调⽤的是 shutdownNow() ⽅法,则相当于调⽤每个线程的interrupt() ⽅法。
让线程池在指定时间内⽴即关闭:
public static void main(String[] args) {
ExecutorService pool = wFixedThreadPool(5);
final long waitTime = 8 * 1000;
final long awaitTime = 2 * 1000;
Runnable task1 = new Runnable(){
public void run(){
try {
System.out.println("task1 start");
Thread.sleep(waitTime);
System.out.println("task1 end");
} catch (InterruptedException e) {
System.out.println("task1 interrupted: " + e);
}
}
};
Runnable task2 = new Runnable(){
public void run(){
try {
System.out.println("task2 start");
Thread.sleep(1000);
System.out.println("task2 end");
} catch (InterruptedException e) {
System.out.println("task2 interrupted: " + e);
}
}
};
//消耗时间很长的任务 8秒
//消耗时间1秒
for(int i=0; i<1000; ++i){
}
try {
// 告诉线程池,如果所有任务执⾏完毕则关闭线程池
pool.shutdown();
// 判断线程池是否在限定时间内,或者线程池内线程全部结束
if(!pool.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)){
// 超时的时候向线程池中所有的线程发出中断(interrupted)。
pool.shutdownNow();
}
} catch (InterruptedException e) {
System.out.println("awaitTermination interrupted: " + e);
}
System.out.println("end");
}
task1 start
task2 start
task2 start
task2 start
task2 start
task2 end
task2 end
task2 end
task2 start
task2 end
task2 start
task2 start
task2 start
task2 end
task2 end
task2 end
task2 end
end
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task1 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted
Process finished with exit code 0
如果只执⾏shutdown,线程池会等待所有线程全部结束才终⽌线程池。
且!执⾏shutdown()后,就不能再继续使⽤ExecutorService来追加新的任务了,如果继续调⽤execute/submit⽅法执⾏新的任务的话,就会抛出RejectedExecutionException异常。
所以⼀般的调⽤顺序为:
shutdown ⽅法,停⽌接收新的任务
awaitTermination ⽅法,判断任务是否执⾏完毕或者是否在指定时间内
shutdownNow⽅法
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论