parallel循环java_Java8parallelStream并发安全原理讲解Java8 parallelStream并发安全
背景
Java8的stream接⼝极⼤地减少了for循环写法的复杂性,stream提供了map/reduce/collect等⼀系列聚合接⼝,还⽀持并发操作:parallelStream。
在爬⾍开发过程中,经常会遇到遍历⼀个很⼤的集合做重复的操作,这时候如果使⽤串⾏执⾏会相当耗时,因此⼀般会采⽤多线程来提速。Java8的paralleStream⽤fork/join框架提供了并发执⾏能⼒。但是如果使⽤不当,很容易陷⼊误区。
Java8的paralleStream是线程安全的吗
⼀个简单的例⼦,在下⾯的代码中采⽤stream的forEach接⼝对1-10000进⾏遍历,分别插⼊到3个ArrayList中。其中对第⼀个list的插⼊采⽤串⾏遍历,第⼆个使⽤paralleStream,第三个使⽤paralleStream的同时⽤ReentryLock对插⼊列表操作进⾏同步:
private static List list1 = new ArrayList<>();
private static List list2 = new ArrayList<>();
private static List list3 = new ArrayList<>();
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
IntStream.range(0, 10000).forEach(list1::add);
IntStream.range(0, 10000).parallel().forEach(list2::add);
IntStream.range(0, 10000).forEach(i -> {
lock.lock();
try {
list3.add(i);
}finally {
lock.unlock();
}
});
System.out.println("串⾏执⾏的⼤⼩:" + list1.size());
System.out.println("并⾏执⾏的⼤⼩:" + list2.size());
System.out.println("加锁并⾏执⾏的⼤⼩:" + list3.size());
}
执⾏结果:
串⾏执⾏的⼤⼩:10000
并⾏执⾏的⼤⼩:9595
加锁并⾏执⾏的⼤⼩:10000
并且每次的结果中并⾏执⾏的⼤⼩不⼀致,⽽串⾏和加锁后的结果⼀直都是正确结果。显⽽易见,stream.parallel.forEach()中执⾏的操作并⾮线程安全。
那么既然paralleStream不是线程安全的,是不是在其中的进⾏的⾮原⼦操作都要加锁呢?我在stackOverflow上到了答案:
在上⾯两个问题的解答中,证实paralleStream的forEach接⼝确实不能保证同步,同时也提出了解决⽅案:使⽤collect和reduce接⼝。
在Javadoc中也对stream的并发操作进⾏了相关介绍:
The Collections Framework provides synchronization wrappers, which add automatic synchronization to an arbitrary collection, making it thread-safe.
Collections框架提供了同步的包装,使得其中的操作线程安全。
所以下⼀步,来看看collect接⼝如何使⽤。
stream的collect接⼝
闲话不多说直接上源码吧,Stream.java中的collect⽅法句柄:
R collect(Collector super T, A, R> collector);
在该实现⽅法中,参数是⼀个Collector对象,可以使⽤Collectors类的静态⽅法构造Collector对象,⽐如
除此之外,我们如果要在collect接⼝中做更多的事,就需要⾃定义实现Collector接⼝,需要实现以下⽅法:
Supplier supplier();
BiConsumer accumulator();
BinaryOperator combiner();
Function finisher();
Set characteristics();
要轻松理解这三个参数,要先知道fork/join是怎么运转的,⼀图以蔽之:
简单地说就是⼤任务拆分成⼩任务,分别⽤不同线程去完成,然后把结果合并后返回。所以第⼀步是拆分,第⼆步是分开运算,第三步是合并。这三个步骤分别对应的就是Collector的supplier,accumulator和combiner。talk is cheap show me the code,下⾯⽤⼀个例⼦来说明:
输⼊是⼀个10个整型数字的ArrayList,通过计算转换成double类型的Set,⾸先定义⼀个计算组件:
Compute.java:
public class Compute {
public Double compute(int num) {
return (double) (2 * num);
}
}
接下来在Main.java中定义输⼊的类型为ArrayList的nums和类型为Set的输出结果result:
private List nums = new ArrayList<>();
private Set result = new HashSet<>();
定义转换list的run⽅法,实现Collector接⼝,调⽤内部类Container中的⽅法,其中characteristics()⽅法返回空set即可:
public void run() {
// 填充原始数据,nums中填充0-9 10个数
IntStream.range(0, 10).forEach(nums::add);
//实现Collector接⼝
result = nums.stream().parallel().collect(new Collector>() {
@Override
public Supplier supplier() {
return Container::new;
}
@Override
public BiConsumer accumulator() {
return Container::accumulate;
}
@Override
public BinaryOperator combiner() {
return Container::combine;
}
@Override
public Function> finisher() {
return Container::getResult;
}
@Override
public Set characteristics() {
// 固定写法
ptySet();
}
});
}
构造内部类Container,该类的作⽤是⼀个存放输⼊的容器,定义了三个⽅法:accumulate⽅法对输⼊数据进⾏处理并存⼊本地的结果
combine⽅法将其他容器的结果合并到本地的结果中
getResult⽅法返回本地的结果
Container.java:
class Container {
// 定义本地的result
public Set set;
public Container() {
this.set = new HashSet<>();
}
public Container accumulate(int num) {
this.set.add(computepute(num));
return this;
}
public Container combine(Container container) {
this.set.addAll(container.set);
return this;
}
public Set getResult() {
return this.set;
}
}
在Main.java中编写测试⽅法:
public static void main(String[] args) {
Main main = new Main();
main.run();
System.out.println("原始数据:");
main.nums.forEach(i -> System.out.print(i + " "));
System.out.println("\n\ncollect⽅法加⼯后的数据:");
}
输出:
原始数据:java怎么编写
0 1 2 3 4 5 6 7 8 9
collect⽅法加⼯后的数据:
0.0 2.0 4.0 8.0 16.0 18.0 10.0 6.0 12.0 14.0
我们将10个整型数值的list转成了10个double类型的set,⾄此验证成功~
⼀⾔蔽之
总结就是paralleStream⾥直接去修改变量是⾮线程安全的,但是采⽤collect和reduce操作就是满⾜线程安全的了。java8中parallelStream性能测试及结果分析
测试1
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 20, time = 3, timeUnit = TimeUnit.SECONDS)
@Fork(1)
@State(Scope.Benchmark)
public class StreamBenchTest {
List data = new ArrayList<>();
@Setup
public void init() {
// prepare
for(int i=0;i<100;i++){
data.add(UUID.randomUUID().toString());
}
}
@TearDown
public void destory() {
// destory
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。