Java并发包线程池之Executors、ExecutorCompletionService⼯
具类
前⾔
前⾯介绍了Java并发包提供的三种线程池,它们⽤处各不相同,接下来介绍⼀些⼯具类,对这三种线程池的使⽤。
Executors
Executors是JDK1.5就开始存在是⼀个线程池⼯具类,它定义了⽤于Executor、ExecutorService、ScheduledExecutorService、ThreadFactory和Callable的⼯⼚和⼯具⽅法。在开始之前,先了解⼀下它提供的⼀些内部类:
DelegatedExecutorService、DelegatedScheduledExecutorService、FinalizableDelegatedExecutorService
1//只暴露实现ExecutorService接⼝的⽅法的包装器类。Delegated 是代理,委托的意思
2static class DelegatedExecutorService extends AbstractExecutorService {
3private final ExecutorService e;
4
5//构造器传⼊⼀个ExecutorService实例
6    DelegatedExecutorService(ExecutorService executor) { e = executor; }
7
8    .....
9 }
10
11//可⾃动终结的包装线程池,FinalizableDelegatedExecutorService的实例即使不⼿动调⽤shutdown⽅法关闭现称池,虚拟机也会帮你完成此任务
12static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
13
14    FinalizableDelegatedExecutorService(ExecutorService executor) {
15super(executor);
16    }
17
18//finalize⽅法会在虚拟机gc清理对象时被调⽤
19protected void finalize() {
20super.shutdown();
21    }
22 }
23
24
25//只暴露实现ScheduledExecutorService的接⼝⽅法的⼀个包装器类。
26static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService {
27private final ScheduledExecutorService e;
28
29//构造器传⼊⼀个ScheduledExecutorService实例
30    DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
31super(executor);
32        e = executor;
33    }
34//.....
35 }
View Code
这三个类都是包装类,DelegatedExecutorService是对ExecutorService的⼀种包装,仅仅只给使⽤者暴露 ExecutorService的接⼝⽅法,屏蔽掉具体实现类的独有⽅法。DelegatedScheduledExecutorService是对ScheduledExecutorService的包装,仅仅只给使⽤者暴露ScheduledExecutorService的接⼝⽅法,⽽FinalizableDelegatedExecutorService是在对ExecutorService的包装基础上,增加了⾃动线程池回收的功能,其finalize⽅法会在虚拟机gc清理对象时被调⽤,从⽽将⽤户忘记关闭的⽆⽤线程池关闭并回收。PrivilegedCallableUsingCurrentClassLoader、PrivilegedCallable则是对Callable任务的运⾏上下⽂和类加载的控制,RunnableAdapter则是⽤于将Runnable包装成Callable的包装类,DefaultThreadFactory是默认的线程⼯⼚,创建的线程名称都具有:pool-池编号-thread-线程编号,这样的前缀。PrivilegedThreadFactory继承了DefaultThreadFactory,在默认的线程⼯⼚上,扩展了捕获访问控制的上下⽂和类加载器。
⼯具⽅法
⼀、下⾯是⼀些创建线程池的⼯具⽅法:
public static ExecutorService newFixedThreadPool(int nThreads)
返回可以执⾏Runnable、Callable任务的⼀个固定线程池的ThreadPoolExecutor实例,其corePoolSize和maximumPoolSize都是指定的⼤⼩,keepAliveTime为0,任务队列是⽆界的LinkedBlockingQueue队列。⾸先根据 ThreadPoolExecutor的特性,corePoolSize和maximumPoolSize都相等时,意味着不会创建⾮核⼼线程,在keepAliveTime默认没有应⽤于核⼼线程时,其keepAliveTime⽆论是什么值
都⽆意义,因此这⾥的keepAliveTime没有实际意义。然后由于是⽆界队列,maximumPoolSize参数其实也是⽆意义的,是所有来不及处理的任务都会⽆条件的丢进该⽆界队列中,直到系统资源耗尽,因此使⽤此线程池要注意任务的提交频率,不然会有内存耗尽,服务器宕机的风险。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
该⽅法只是⽐第⼀个⽅法newFixedThreadPool(int nThreads) 多传递了⼀个线程⼯⼚参数,其他都⼀样。
public static ExecutorService newSingleThreadExecutor()
利⽤FinalizableDelegatedExecutorService返回包装过得可以执⾏Runnable、Callable任务的只有单个线程的ThreadPoolExecutor实例。其corePoolSize和maximumPoolSize都是1,任务队列是⽆界的Lin
kedBlockingQueue。⾸先,这是⼀个单线程按顺序执⾏任务的线程池,但如果在线程池关闭之前某个任务执⾏失败异常结束,那么如果需要执⾏后续任务,将可能会创建⼀个新的线程替代这个唯⼀的线程。其次该⽅法看似与执⾏newFixedThreadPool(1)的效果⼀样,但由于该⽅法放回的线程池经过FinalizableDelegatedExecutorService包装过,屏蔽了更改线程池配置的⽅法,因此该线程池⽆法被重新配置。最后经过FinalizableDelegatedExecutorService包装之后,该线程池具有了⾃动被JVM垃圾回收时终结回收的特性,即使⽤户使⽤完之后没有调⽤shutdown。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
该⽅法就是⽐newSingleThreadExecutor()⽅法多传了⼀个线程⼯⼚,其他都⼀样。
public static ExecutorService newCachedThreadPool()
返回可以⽴即执⾏Runnable、Callable任务的ThreadPoolExecutor线程池,其corePoolSize为0,maximumPoolSize为
Integer.MAX_VALUE,但是任务队列是SynchronousQueue的0容量队列。因此,⼀旦任务被提交将⽴即被执⾏,如果线程不够将⽴即创建线程,该线程池理论上为了满⾜任务需要可以创建Integer.MAX_VALUE多个线程(当然该⽅法设置了keepAliveTime为60秒,在线程空闲下来之后所有
线程都可能会被销毁),当任务的提交频率超过了任务的平均处理速率,将导致创建越来越多的线程以处理到达的任务,因此也有资源耗尽的潜在风险,必须要有效的控制任务的提交频率。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
指定了线程⼯⼚的newCachedThreadPool(),其他都⼀样。
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
返回⼀个可以执⾏Runnable、Callable类型的周期/延迟任务的的ScheduledThreadPoolExecutor线程池实例,其corePoolSize为1。由于其内部也是⼀个⽆界队列,因此maximumPoolSize、keepAliveTime是⽆效的。所以它也是⼀个单线程执⾏延迟/周期任务的线程池,所有来不及处理的任务都会⽆条件的丢进该⽆界队列中,直到系统资源耗尽,因此使⽤此线程池要注意任务的提交频率,不然会有内存耗尽,服务器宕机的风险。其返回经过了DelegatedScheduledExecutorService包装,不可再被转换成ScheduledThreadPoolExecutor实例。
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
指定了线程⼯⼚的newSingleThreadScheduledExecutor(),其他都⼀样。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建⼀个可以执⾏Runnable、Callable类型的周期/延迟任务的具有指定线程数量的ScheduledThreadPoolExecutor线程池实例,由于其内部是⽆界队列,所有来不及处理的任务都会⽆条件的丢进该⽆界队列中,直到系统资源耗尽,因此使⽤此线程池要注意任务的提交频率,不然会有内存耗尽,服务器宕机的风险。其返回结果没有经过包装,可以转换成ScheduledThreadPoolExecutor实例。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
指定了线程⼯⼚的newScheduledThreadPool(int corePoolSize),其他都⼀样。
public static ExecutorService newWorkStealingPool(int parallelism)
返回⼀个可执⾏ForkJoinTask任务的具有给定并发度,⼯作线程按FIFO先进先出的异步⼯作模式的ForkJoinPool,由于ForkJoinPool是⼯作窃取机制实现的,所以该⽅法名有WorkStealing的字样。注意ForkJoinPool⽤于任务可递归分解执⾏的ForkJoinTask。由于该⽅法创建ForkJoinTask指定的asyncMode是ture,不能保证所提交任务的执⾏顺序,因此⼀般只适⽤于不需要合并递归执⾏结果(即不需要返回结果)的场景,⼀般⽤于事件消息传递的场景。
public static ExecutorService newWorkStealingPool()
⽆参的创建⼀个可执⾏ForkJoinTask任务的,⼯作线程按FIFO先进先出的异步⼯作模式的ForkJoinPool。该⽅法与newWorkStealingPool(int parallelism)唯⼀的区别就是,该⽅法的并⾏度是根据当前计算机的CPU核⼼数确定的,其他都⼀样,asyncMode 也是true,⼀般来说这个⽅法⽐newWorkStealingPool(int parallelism)更好⽤。
对于以上提供的线程池的⼯具⽅法可以看出,Executors并没有返回基于有界队列的ThreadPoolExecutor线程池⼯具类,这⼤概是因为有界队列有任务被拒绝的潜在可能,这在⼤多数情况下⽤户都是不可接受的。在实际使⽤中,还是应该视情况⽽定,不要仅仅限于使⽤Executors提供的这⼏个⼯具⽅法返回的线程池,毕竟ThreadPoolExecutor线程池是⼀个可在多个参数上调节其⼯作机制的线程池。
⼆、下⾯是包装线程池的⼯具⽅法:
public static ExecutorService unconfigurableExecutorService(ExecutorService executor),包装指定的ExecutorService线程池为DelegatedExecutorService类型,即只暴露ExecutorService 的接⼝⽅法,其返回结果不能强转成原来的线程池实现类。按字⾯意思就是⽆法对该线程池的配置进⾏更改,因为已经将那些更改配置参数的setter⽅法都屏蔽了。
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) ,包装指定的执⾏延迟/周期任务的线程池为DelegatedScheduledExecutorService类型,即只暴露ScheduledExecutorService的接⼝⽅法,其返回结果不能强转成原来的线程池实现类。按字⾯意思就是⽆法对该线程池的配置进⾏更改。
三、返回⼀个默认的线程⼯⼚的⼯具⽅法:
public static ThreadFactory defaultThreadFactory(),返回默认的线程⼯⼚实现DefaultThreadFactory,创建的线程名称都具有:pool-池编号-thread-线程编号,这样的前缀。创建出的线程都是⾮守护线程,因此使⽤该线程⼯⼚的线程池在⽤完之后最好⼿动shutdown,避免JVM 挂起。
public static ThreadFactory privilegedThreadFactory(),返回捕获了访问控制的上下⽂和类加载器的默认线程⼯⼚DefaultThreadFactory的扩展类PrivilegedThreadFactory。创建的线程名称也都具有:pool-池编号-thread-线程编号,这样的前缀。
四,Callable相关的⼯具⽅法:
public static <T> Callable<T> callable(Runnable task, T result),将⼀个返回指定结果的Runnable 任务转换成Callable任务。
public static Callable<Object> callable(Runnable task) ,将⼀个返回结果为null的Runnable 任务转换成Callable任务。
public static Callable<Object> callable(final PrivilegedAction<?> action) ,返回⼀个在调⽤时执⾏指定的PrivilegedAction任务并返回其执⾏结果的Callable对象。
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) ,返回⼀个在调⽤时执⾏指定的PrivilegedExceptionAction任务并返回其执⾏结果的Callable对象。
public static <T> Callable<T> privilegedCallable(Callable<T> callable) ,返回⼀个在调⽤它时可在当前的访问控制上下⽂中执⾏给定的Callable任务的Callable对象。
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable),返回⼀个在调⽤它时可在当前的访问控制上下⽂中,使⽤当前上下⽂类加载器作为上下⽂类加载器来执⾏给定的Callable任务的Callable对象。CompletionService
这是⼀个接⼝,该接⼝是为了将任务的提交与获取任务的执⾏结果解耦⽽设计的,当然这⾥的任务⼀
般是指多个。这通常应⽤于那种不关⼼执⾏顺序的多个耗时操作。例如异步I/O,或者类⽐⼀个页⾯的不同模块的异步加载。通常,CompletionService依赖于⼀个单独的Executor来实际执⾏任务,在这种情况下,CompletionService只管理⼀个内部完成队列。ExecutorCompletionService类提供了此⽅法的⼀个实现,它虽然是⼀个需要创建实例的类,但其实也可以看作是⼀种⼯具类。
内存⼀致性效果:线程向 CompletionService 提交任务之前的操作 happen-before 该任务实际的执⾏操作,后者依次 happen-before 紧跟后⾯从完成队列中成功取⾛其返回结果的操作。
ExecutorCompletionService
⾸先看起其内部类QueueingFuture:
1public class ExecutorCompletionService<V> implements CompletionService<V> {
2private final Executor executor;
3private final AbstractExecutorService aes;
4private final BlockingQueue<Future<V>> completionQueue; //已完成任务的Future阻塞队列
5
6
7//将以完成的任务的Future排队的实现
8private class QueueingFuture extends FutureTask<Void> {
9
10        QueueingFuture(RunnableFuture<V> task) {
11super(task, null);
12this.task = task;
13        }
14//实现回调接⼝,当任务被完成,将Future放进阻塞队列
15protected void done() { completionQueue.add(task); }
16private final Future<V> task;
17    }
18
19    ......
20 }
View Code
QueueingFuture就是对FutureTask的扩展,利⽤了FutureTask任务在被执⾏完成之后会回调done()⽅法的特性,将已经完成的任务⽴即放到⼀个阻塞队列。
ExecutorCompletionService有两个构造⽅法,都需要传⼊⼀个Executor线程池实现类,⽤来实际执⾏任务,另⼀个可选的参数就是内部使⽤的阻塞队列,阻塞队列的选取可以决定存取已完成任务的Future的顺序。
下⾯是它实现CompletionService 接⼝的⽅法:
1//提交Callable任务
2public Future<V> submit(Callable<V> task) {
3if (task == null) throw new NullPointerException();
4    RunnableFuture<V> f = newTaskFor(task);
5    ute(new QueueingFuture(f));
6return f;
7 }
8
9//提交Runnable任务,返回固定的结果
10public Future<V> submit(Runnable task, V result) {
提交更改是内存条吗11if (task == null) throw new NullPointerException();
12    RunnableFuture<V> f = newTaskFor(task, result);
13    ute(new QueueingFuture(f));
14return f;
15 }
16
17//获取⼀个已经完成的任务的Future,直到成功或者被中断。
18public Future<V> take() throws InterruptedException {
19return completionQueue.take();
20 }
21
22//获取并移除表⽰下⼀个已完成任务的 Future,如果不存在这样的任务,则返回 null。
23public Future<V> poll() {
24return completionQueue.poll();
25 }
26
27//超时版本的poll
28public Future<V> poll(long timeout, TimeUnit unit)
29throws InterruptedException {
30return completionQueue.poll(timeout, unit);
31 }
View Code
从这些代码可见,都很简单,提交的任务被封装成QueueingFuture任务,从⽽可以使那些已经完成的任务被⽴即丢进准备好的阻塞队
列,take/poll则是对阻塞队列中的已经完成的任务的Future的提取了。
以下是Java Doc中的两个⽰例,⽰例⼀:
1void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
2      CompletionService ecs = new ExecutorCompletionService(e); //根据指定的线程池实例与任务集合创建ExecutorCompletionService实例
3for (Callable s : solvers)
4          ecs.submit(s); //提交所有任务
5
6int n = solvers.size();
7for (int i = 0; i < n; ++i) {
8          Result r = ecs.take().get(); //提取已经完成的任务结果
9if (r != null)
10              use(r);  //对于⾮空返回结果执⾏相应的操作
11      }
12 }
⽰例⼀⼀次性提交⼀组任务,并且对每⼀个任务返回的⾮空结果执⾏给定的操作。
⽰例⼆:
1void solve(Executor e, Collection<Callable<Result>> solvers)
2throws InterruptedException {
3      CompletionService ecs = new ExecutorCompletionService(e);
4int n = solvers.size();
5      List<Future<Result>> futures = new ArrayList>(n);
6      Result result = null;
7try {
8for (Callable s : solvers)
9              futures.add(ecs.submit(s)); //提交所有任务,并记录Future
10
11for (int i = 0; i < n; ++i) {
12try {
13                  Result r = ecs.take().get();
14if (r != null) { //发现⼀个任务完成就退出
15                      result = r;
16break;
17                  }
18              } catch (ExecutionException ignore) {}
19          }
20      }
21finally {
22for (Future f : futures) //取消其余所有任务
23              f.cancel(true);
24      }
25
26if (result != null) //执⾏相应的操作
27          use(result);
28  }
View Code
⽰例⼆虽然也提交了⼀组任务,但只要有⼀个任务返回⾮空结果就取消其他所有任务,并那种该结果执⾏指定的操作。ExecutorCompletionService利⽤了FutureTask任务不论是异常结束,还是正常结束还是被取消,都会回调其done⽅法的特点,扩展FutureTask并实现了该⽅法将不论以任何⽅式结束的任务的⽤于获取异步任务结果的Future放⼊⼀个阻塞队列中,因此可以通过读取队列的⽅法顺序获取那些任务的执⾏结果,由于任务可能是异常结束,因此在使⽤的时候,需要对()拿到的执⾏结果进⾏空值判断。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。