JavaForkJoinPool⽤法及原理
1. 概述
fork/join 框架在 Java 7 中引⼊。它基于分⽽治之的思想,通过尝试利⽤所有可⽤处理器内核来帮助加速并⾏计算。
什么是分⽽治之?它分为任务分解,和结果合并两个阶段。
⾸先 fork 通过递归⽅式将⼀个复杂任务分解为更⼩的独⽴⼦任务,直⾄⼦任务简单到⽆需再分。
分完之后, “join” 部分开始,将所有⼦任务结果递归地合并为⼀个结果。如果任务的返回值为void类型,那么程序只需等待所有⼦任务执⾏完毕。
为了提⾼并⾏计算效率,fork/join 框架使⽤⼀个名为ForkJoinPool的线程池。该线程池负责管理类型为ForkJoinWorkerThread的⼯作线程。
ForkJoinPool
ForkJoinPool是整个框架的核⼼,它实现了接⼝。
我们知道⼀个⼯作线程(Worker Thread)同⼀时间只能执⾏⼀个任务,ForkJoinPool不会为每个⼦任务创建⼀个独⽴的线程,⽽是每个线程都维护了⼀个双端队列(),⽤来存储需要执⾏的任务。
这种架构对于借助⼯作窃取算法平衡线程的⼯作负载⾄关重要。
2.1. ⼯作窃取算法
何为⼯作窃取算法?
简单来说 – 空闲的线程尝试从其他繁忙线程的deque双端队列中窃取⼀个任务来执⾏。
默认情况下,⼀个⼯作线程从 deque 头部读取任务。如果队列为空,则该线程会从其他繁忙线程的 deque 尾部或全局队列中获取⼀个任务。
这种算法最⼤限度地避免发⽣线程竞争任务,同时减少线程寻任务的次数。
2.2. 实例化 ForkJoinPool
在 Java 8 中,获取ForkJoinPool实例最便捷的⽅法是使⽤其静态⽅法commonPool()。顾名思义,它返回公共池的引⽤,公共池是每个ForkJoinTask 的默认线程池。
根据,建议使⽤预定义的公共线程池以减少资源消耗,避免每个任务都创建⼀个单独的线程池。
Java 7 中,需要我们⾃⼰实现单例模式,例如⽤饿汉式:
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
获取实例:
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
通过ForkJoinPool构造函数我们可以创建⾃定义线程池,⾃定义并⾏级别(parallelism),线程创建⼯⼚(ThreadFactory),异常处理器(ExceptionHandler)。上⾯例⼦中,parallelism参数为2,表⽰该线程池将使⽤2个处理器内核。
3. ForkJoinTask <V>
ForkJoinTask是我们任务的基类。实际中,我们应该继承它的两个⼦类:⽆返回值的RecursiveAction和带返回值的 RecursiveTask<V>。两者都有⼀个抽象⽅法compute(),在⾥⾯实现我们的任务执⾏逻辑。
3.1. RecursiveAction ⽰例
下⾯例⼦中,我们将变量workload的所有字母转为⼤写并打印⽇志。本例仅仅是⽤于演⽰⽬的,这个任务没有实际意义。
为了演⽰框架任务分解⾏为,本例使⽤createSubtask()⽅法在workload.length()⼤于设定阈值时分解任务。
workload被递归地分解为⼦串,并创建基于这些⼦串的CustomRecursiveTask实例。
结果返回⼀个⼦任务集合 List<CustomRecursiveAction>。
使⽤invokeAll()将集合中的任务提交到ForkJoinPool。
public class CustomRecursiveAction extends RecursiveAction {
private String workload = "";
private static final int THRESHOLD = 4;
private static Logger logger =
public CustomRecursiveAction(String workload) {
this.workload = workload;
}
@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}
private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();
String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());
subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));
return subtasks;
}
private void processing(String work) {
String result = UpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}
可以套⽤此模版开发我们⾃⼰的RecursiveAction类。创建⼀个对象表⽰我们的总任务,选择⼀个合适的阈值,定义⼀个⽤于分解任务的⽅法,以及实际处理任务的⽅法。
3.2. RecursiveTask<V>
对于带返回值的任务,实现逻辑类似,只是需要把每个⼦任务的结果合并到⼀个结果中:
public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;
private static final int THRESHOLD = 20;
public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}
@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}
private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
dividedTasks.add(new CustomRecursiveTask(
return dividedTasks;
}
private Integer processing(int[] arr) {
return Arrays.stream(arr)java arraylist用法
.
filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}
本例中,变量arr表⽰我们的任务。createSubtasks()⽅法递归地将⼀个⼤任务分解为⼩任务,直到⼩于阈值时不再分解。然后,invokeAll()⽅法将⼦任务提交到公共池,并返回⼀个Future集合。
为了触发执⾏,为每个⼦任务调⽤join()⽅法。
这⾥使⽤了 Java 8 中的 Stream API 实现。sum()⽅法将⼦结果合并为最终结果。
4. 提交任务到 ForkJoinPool 中
要将任务提交到ForkJoinPool线程池中,可以使⽤:
submit() 或 execute() ⽅法:
int result = customRecursiveTask.join();
invoke()⽅法 fork 任务并等待返回结果,不需要⼿动 join 操作。
int result = forkJoinPool.invoke(customRecursiveTask);
invokeAll()⽅法将ForkJoinTask任务批量提交到ForkjoinPool。将任务作为参数传⼊(该⽅法有多个重载⽅法,可以传2个任务,或变长参数,或集合形式),fork然后按顺序返回⼀个Future集合。
或者,你也可以单独使⽤ fork() 和 join() ⽅法。fork()提交⼀个任务到线程池中,但不触发执⾏。这种情况下,必须⼿动调⽤join⽅法。如果是RecursiveAction类型的任务,join()返回null,如果是RecursiveTask<V>类型,则返回任务执⾏结果。
customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();
在上⾯ RecursiveTask<V> 例⼦中我们使⽤invokeAll()批量提交⼦任务到线程池中。同样的⼯作也可以通过fork()和join()来完成,不过这会影响结果的排序。
为了避免混淆,通常最好使⽤invokeAll()⽅法将多个任务提交到ForkJoinPool。
5. 总结
在处理⼤型任务时,使⽤ fork/join 框架能加快处理速度。但前提是遵守以下⼏个原则:
尽可能少使⽤线程池 – ⼤多数场景下,⼀个应⽤最好使⽤⼀个线程池
使⽤默认的公共线程池l, 如果不需要特殊调优
使⽤⼀个合理的阈值将 ForkJoinTask 拆分为⼦任务
避免在ForkJoinTask中编写阻塞代码
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论