ForkJoin框架与Java8StreamAPI之并⾏流的速度⽐较
Fork/Join 框架有特定的ExecutorService和线程池构成。ExecutorService可以运⾏任务,并且这个任务会被分解成较⼩的任务,它们从线程池中被fork(被不同的线程执⾏)出来,在join(即它的所有的⼦任务都完成了)之前会⼀直等待。
Fork/Join 使⽤了任务窃取来最⼩化线程的征⽤和开销。线程池中的每条⼯作线程都有⾃⼰的双端⼯作队列并且会将新任务放到这个队列中去。它从队列的头部读取任务。如果队列是空的,⼯作线程就尝试从另外⼀个队列的末尾获取⼀个任务。窃取操作不会很频繁,因为⼯作线程会采⽤后进先出的顺序将任务放⼊它们的队列中,同时⼯作项的规模会随着问题分割成⼦问题⽽变⼩。你⼀开始把任务交给⼀个中⼼的⼯作线程,之后它会继续将这个任务分解成更⼩的任务。最终所有的⼯作线程都只会设计很少量的同步操作。
Stream介绍(引)
Stream 作为 Java 8 的⼀⼤亮点,它与 java.io 包⾥的 InputStream 和 OutputStream 是完全不同的概念。它也不同于 StAX 对 XML 解析的 Stream,也不是 Amazon Kinesis 对⼤数据实时处理的 Stream。Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进⾏各种⾮常便利、⾼效的聚合操作(aggregate operation),或者⼤批量数据操作 (bulk data operation)。Stream API 借助于同样新出现的 Lambda 表达式,极⼤的提⾼编程效率和程序可读性。同时它提供串⾏和并⾏两种模式进⾏
汇聚操作,并发模式能够充分利⽤多核处理器的优势,使⽤ fork/join 并⾏⽅式来拆分任务和加速处理过程。通常编写并⾏代码很难⽽且容易出错, 但使⽤ Stream API ⽆需编写⼀⾏多线程的代码,就可以很⽅便地写出⾼性能的并发程序。所以说,Java 8 中⾸次出现的 java.util.stream 是⼀个函数式语⾔+多核时代综合影响的产物。
Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像⼀个⾼级版本的 Iterator。原始版本的 Iterator,⽤户只能显式地⼀个⼀个遍历元素并对其执⾏某些操作;⾼级版本的 Stream,⽤户只要给出需要对其包含的元素执⾏什么操作,⽐如 “过滤掉长度⼤于 10 的字符串”、“获取每个字符串的⾸字母”等,Stream 会隐式地在内部进⾏遍历,做出相应的数据转换。
Stream 就如同⼀个迭代器(Iterator),单向,不可往复,数据只能遍历⼀次,遍历过⼀次后即⽤尽了,就好⽐流⽔从⾯前流过,⼀去不复返。
⽽和迭代器⼜不同的是,Stream 可以并⾏化操作,迭代器只能命令式地、串⾏化操作。顾名思义,当使⽤串⾏⽅式去遍历时,每个item 读完后再读下⼀个 item。⽽使⽤并⾏去遍历时,数据会被分成多个段,其中每⼀个都在不同的线程中处理,然后将结果⼀起输出。Stream 的并⾏操作依赖于 Java7 中引⼊的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。
所以说,实际上Stream并⾏流实际上就是⼀个帮你fork/join 后的API,为了验证效率,我编写了⼀个对
1000_000个数进⾏排序的程序
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import urrent.ForkJoinPool;
import urrent.RecursiveAction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class ParallelMergeSort {
public static void main(String[] args) {
final int SIZE = 10000000;
int[] list1 = new int[SIZE];
int[] list2 = new int[SIZE];
Integer[] list3 = new Integer[SIZE];
for (int i = 0; i < list1.length; i++) {
list1[i] = list2[i] = (int)(Math.random() * 10000000);
list3[i] = list1[i];
}
long startTime = System.currentTimeMillis();
parallelMergeSort(list1);
long endTime = System.currentTimeMillis();
System.out.println("Parallel time with " + Runtime().availableProcessors() + " processors is " + (endTime - startTime) + " milliseconds");
startTime = System.currentTimeMillis();
endTime = System.currentTimeMillis();
System.out.println("Sequent time is " + (endTime - startTime) + " milliseconds");
List<Integer> tmp = new ArrayList<Integer>();
Collections.addAll(tmp, list3);
startTime = System.currentTimeMillis();
IntStream tmp1 = tmp.stream().parallel().mapToInt(Integer::intValue).sorted();
endTime = System.currentTimeMillis();
System.out.println("ParallelStream time is " + (endTime - startTime) + " milliseconds");
tmp1.limit(100).forEachOrdered(System.out::println);
/*
for(int i = 0; i < 100; i++) {
System.out.(i));
}*/
}
public static void parallelMergeSort(int[] list) {
RecursiveAction mainTask = new SortTask(list);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);
}
public static class SortTask extends RecursiveAction{
/**
*
*/
private static final long serialVersionUID = 1L;
private final int THRESHOLD = 500;
private int[] list;
SortTask(int[] list){
this.list = list;
}
@Override
protected void compute() {
if (list.length < THRESHOLD)
java.util.Arrays.sort(list);
else {
//Obtain the first half
int[] firstHalf = new int[list.length / 2];
System.arraycopy(list, 0, firstHalf, 0, list.length / 2);
//Obtain the second half
int secondHalfLength = list.length - list.length / 2;
int[] secondHalf = new int[secondHalfLength];
System.arraycopy(list, list.length /2, secondHalf, 0, secondHalfLength);
/
/Recursively sort the two halves
invokeAll(new SortTask(firstHalf), new SortTask(secondHalf));
//Merge firstHalf with second
<(firstHalf, secondHalf, list);
}
}
}
public static class MergeSort {
/** The method for sorting the numbers */
public static void mergeSort(int[] list) {
if (list.length > 1) {
/
/ Merge sort the first half
int[] firstHalf = new int[list.length / 2];
System.arraycopy(list, 0, firstHalf, 0, list.length / 2);
mergeSort(firstHalf);
// Merge sort the second half
int secondHalfLength = list.length - list.length / 2;
int[] secondHalf = new int[secondHalfLength];
System.arraycopy(list, list.length / 2,
secondHalf, 0, secondHalfLength);
mergeSort(secondHalf);
// Merge firstHalf with secondHalf into list
merge(firstHalf, secondHalf, list);
}
}
/** Merge two sorted lists */
public static void merge(int[] list1, int[] list2, int[] temp) {
int current1 = 0; // Current index in list1
int current2 = 0; // Current index in list2
int current3 = 0; // Current index in temp
while (current1 < list1.length && current2 < list2.length) {
if (list1[current1] < list2[current2])
temp[current3++] = list1[current1++];
sortedlist
else
temp[current3++] = list2[current2++];
}
while (current1 < list1.length)
temp[current3++] = list1[current1++];
while (current2 < list2.length)
temp[current3++] = list2[current2++];
}
}
}
代码可以看到,利⽤三种⽅法,对随机⽣成的 int 数据排序
第⼀种是⾃⼰编写的fork/join利⽤⼆分法排序
第⼆种是单线程下的⼆分法排序
第三种是并⾏流的排序
为了验证并⾏流是否排序正确,输出流前100个数
结果如图:
但是这是为没有收集器的情况,并⾏流很快的完成并且得到IntStream,加上收集器后:
可以看出,排序很快完成,在最后的类型转换上花费了⼤量的时间,
⽽根据Stream 的介绍,实验fork/join⽅法完成的时间应该不会与并⾏流差距太⼤,实际上,实验中编写的代码在fork分解阶段和join阶段花费了⼤量时间,远不如直接使⽤API快速
但是如果正确使⽤fork/join框架的话也不会很慢
但是相⽐单线程已经远远提升了效率
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论