JavaStream源码分析
:Coding Insight,专注 Java 技术
前⾔
Java 8 的 Stream 使得代码更加简洁易懂,本篇⽂章深⼊分析 Java Stream 的⼯作原理,并探讨 Steam 的性能问题。
Java 8 集合中的 Stream 相当于⾼级版的 Iterator,它可以通过 Lambda 表达式对集合进⾏各种⾮常便利、⾼效的聚合操作(Aggregate Operation),或者⼤批量数据操作 (Bulk Data Operation)。
Stream的聚合操作与数据库SQL的聚合操作sorted、filter、map等类似。我们在应⽤层就可以⾼效地实现类似数据库SQL的聚合操作了,⽽在数据操作⽅⾯,Stream不仅可以通过串⾏的⽅式实现数据操作,还可以通过并⾏的⽅式处理⼤批量数据,提⾼数据的处理效率。
操作分类
官⽅将 Stream 中的操作分为两⼤类:
中间操作(Intermediate operations),只对操作进⾏了记录,即只会返回⼀个流,不会进⾏计算操作。
终结操作(Terminal operations),实现了计算操作。
中间操作⼜可以分为:
⽆状态(Stateless)操作,元素的处理不受之前元素的影响。
有状态(Stateful)操作,指该操作只有拿到所有元素之后才能继续下去。
终结操作⼜可以分为:
短路(Short-circuiting)操作,指遇到某些符合条件的元素就可以得到最终结果
⾮短路(Unshort-circuiting)操作,指必须处理完所有元素才能得到最终结果。
操作分类详情如下图所⽰:
源码结构
Stream 相关类和接⼝的继承关系如下图所⽰:
BaseStream
最顶端的接⼝类,定义了流的基本接⼝⽅法,最主要的⽅法为 spliterator、isParallel。
Stream
最顶端的接⼝类。定义了流的常⽤⽅法,例如 map、filter、sorted、limit、skip、collect 等。
ReferencePipeline
ReferencePipeline 是⼀个结构类,定义内部类组装了各种操作流,定义了Head、StatelessOp、StatefulOp三个内部类,实现了 BaseStream 与 Stream 的接⼝⽅法。
Sink
Sink 接⼝定义了 Stream 之间的操作⾏为,包含begin()、end()、cancellationRequested()、accpt()四个⽅法。ReferencePipeline最终会将整个 Stream 流操作组装成⼀个调⽤链,⽽这条调⽤链上的各个 Stream 操作的上下关系就是通过 Sink 接⼝协议来定义实现的。
操作叠加
Stream 的基础⽤法就不再叙述了,这⾥从⼀段代码开始,分析 Stream 的⼯作原理。
@Test
public void testStream() {
List<String> names = Arrays.asList("kotlin", "java", "go");
int maxLength = names.stream().filter(name -> name.length() <= 4).map(String::length)
.max(Comparator.naturalOrder()).orElse(-1);
System.out.println(maxLength);
}
当使⽤ Stream 时,主要有 3 部分组成,下⾯⼀⼀讲解。
加载数据源
调⽤names.stream()⽅法,会初次加载 ReferencePipeline 的 Head 对象,此时为加载数据源操作。
java.util.Collection#stream
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
StreamSupport 类中的 stream ⽅法,初始化了⼀个 ReferencePipeline的 Head 内部类对象。
java.util.stream.StreamSupport#stream(java.util.Spliterator, boolean)
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
return new ReferencePipeline.Head<>(spliterator,
java streamStreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
中间操作
接着为filter(name -> name.length() <= 4).mapToInt(String::length),是中间操作,分为⽆状态中间操作 StatelessOp 对象和有状态操作 StatefulOp 对象,此时的 Stage 并没有执⾏,⽽是通过AbstractPipeline ⽣成了⼀个中间操作 Stage 链表。
java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (st(u))
downstream.accept(u);
}
};
}
};
}
java.util.stream.ReferencePipeline#map
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
可以看到 filter 和 map ⽅法都返回了⼀个新的StatelessOp对象。new StatelessOp 将会调⽤⽗类 AbstractPipeline 的构造函数,这个构造函数将前后的 Stage 联系起来,⽣成⼀个 Stage 链表:
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
this.previousStage = previousStage;
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
thisbinedFlags = StreamOpFlagbineOpFlags(opFlags, previousStagebinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
终结操作
最后为max(Comparator.naturalOrder()),是终结操作,会⽣成⼀个最终的 Stage,通过这个 Stage 触发之前的中间操作,从最后⼀个Stage开始,递归产⽣⼀个Sink链。java.util.stream.ReferencePipeline#max
@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
return reduce(BinaryOperator.maxBy(comparator));
}
最终调⽤到 java.util.stream.AbstractPipeline#wrapSink,这个⽅法会调⽤ opWrapSink ⽣成⼀个 Sink 链表,对应到本⽂的例⼦,就是 filter 和 map 操作。
@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStagebinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
在上⾯ opWrapSink 上断点调试,发现最终会调⽤到本例中的 filter 和 map 操作。
wrapAndCopyInto ⽣成 Sink 链表后,会通过 copyInfo ⽅法执⾏ Sink 链表的具体操作。
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.ExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
上⾯的核⼼代码是:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论