SpringReactor⼊门与实践
适合阅读的⼈:本⽂适合对 Spring、Netty 等,以及 Java 8 的 Lambda、Stream 等特性有基本认识,希望了解 Spring 5 的反应式编程特性的技术⼈员阅读。
⼀、前⾔
最近⼏年,随着 .js、Golang 等新技术、新语⾔的出现,Java 的服务器端开发语⾔⽼⼤的地位受到了不⼩的挑战。虽然,Java 的市场份额依旧很⼤,短时间内也不会改变,但 Java 社区对于挑战也并没有⽆动于衷。相反,Java 社区积极应对这些挑战,不断提⾼⾃⾝应对⾼并发服务器端开发场景的能⼒。
为了应对⾼并发的服务器端开发,在2009年的时候,微软提出了⼀个更优雅地实现异步编程的⽅式 —— Reactive Programming,中⽂称反应式编程。随后,其它技术也迅速地跟上了脚步,像 ES6 通过 Promise 引⼊了类似的异步编程⽅式。Java 社区也没有落后很
多,Netflix 和 TypeSafe 公司提供了 RxJava 和 Akka 技术,让 Java 平台也有了能够实现反应式编程的框架。
其实,在更早之前,像 Mina 和 Netty 这样的 NIO 框架其实也能搞定⾼并发的服务器端开发任务,但这样的技术相对来说只是少数⾼级开发⼈员⼿中的⼯具。对于更多的普通开发者来说,难度显得⼤了些,
所以不容易普及。
很多年过去了,到了2017年,虽然已经有不少公司在实践反应式编程。但整体来说,应⽤范围依旧不⼤。原因在于缺少简单易⽤的技术将反应式编程推⼴普及,并同诸如 MVC 框架、HTTP 客户端、数据库技术等整合。
终于,在2017年9⽉28⽇,解决上⾯问题的利器浮出⽔⾯ —— Spring 5 正式发布。Spring 5 其最⼤的意义就是能将反应式编程技术的普及向前推进⼀⼤步。⽽作为在背后⽀持 Spring 5 反应式编程的框架 Reactor,也相应的发布了 3.1.0 版本。
本⽂接下来将会向⼤家介绍 Reactive Programming(反应式编程)、Reactor 的⼊门以及实践技巧等相关的内容。⽂章中的实践内容来⾃作者使⽤ Spring 5 和 Reactor 等技术改造实际项⽬的经历。
⼆、Reactor 简介
先介绍⼀下 Reactor 技术。Reactor 框架是 Pivotal 公司(开发 Spring 等技术的公司)开发的,实现了 Reactive Programming 思想,符合 (Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司发起的)的⼀项技术。其名字有反应堆之意,反映了其背后的强⼤的性能。
Reactive Programming
Reactive Programming,中⽂称反应式编程,是⼀种⾼性能应⽤的编程⽅式。其最早是由微软提出并引⼊到 .NET 平台中,随后 ES6 也引⼊了类似的技术。在 Java 平台上,较早采⽤反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在⼤家⽐较熟知的 Hystrix 就是以 RxJava 为基础开发的。
反应式编程其实并不神秘,通过与我们熟悉的迭代器模式对⽐便可了解其基本思想:
event Iterable (pull)Observable (push)
retrieve data T next()onNext(T)
discover error throws Exception onError(Exception)
complete!hasNext()onCompleted()
上⾯表格的中的 Observable 那⼀列便代表反应式编程的 API 使⽤⽅式。可见,它就是常见的观察者模式的⼀种延伸。如果将迭代器看作是拉模式,那观测者模式便是推模式。被订阅者(Publisher)主动的推送数据给订阅者(Subscriber),触发 onNext ⽅法。异常和完成时触发另外两个⽅法。如果 Publisher 发布消息太快了,超过了 Subscriber 的处理速度,那怎么办。这就是 Backpressure 的由
来,Reactive Programming 框架需要提供机制,使得 Subscriber 能够控制消费消息的速度。
在 Java 平台上,Netflix(开发了 RxJava)、TypeSafe(开发了 Scala、Akka)、Pivatol(开发了 Spring、Reactor)共同制定了⼀个被称为 ,⽤于制定反应式编程相关的规范以及接⼝。其主要的接⼝有这三个:
Publisher
Subscriber
Subcription
其中,Subcriber 中便包含了上⾯表格提到的 onNext、onError、onCompleted 这三个⽅法。
对于 Reactive Streams,⼤家只需要理解其思想就可以,包括基本思想以及 Backpressure 等思想即可。
Imperative vs Reactive
对于上⾯表格⾥提到的 Iterable 和 Observale 两种风格,还有另⼀个称呼,便是 Imperative(指令式编程)和 Reactive(反应式编程)这两种风格。其实就是拉模型和推模型的另⼀种表述,⼤家理解其中的思想即可。对于 Imperative,⽼外写的⽂章有时会⽤,直译就是指令式编程,其实就是我们⼤家
平时⽤ Java、Python 等语⾔写代码的常见风格,代码执⾏顺序和编写顺序基本⼀致(这⾥不考虑 JVM 指令重排)
Reactor 的主要模块
Reactor 框架主要有两个主要的模块:reactor-core 和 reactor-ipc。前者主要负责 Reactive Programming 相关的核⼼ API 的实现,后者负责⾼性能⽹络通信的实现,⽬前是基于 Netty 实现的。
Reactor 的主要类
在 Reactor 中,经常使⽤的类并不是很多,主要有以下两个:
Mono 实现了 activestreams.Publisher 接⼝,代表0到1个元素的发布者。
Flux 同样实现了 activestreams.Publisher 接⼝,代表0到N个元素的发表者。
可能会使⽤到的类
Scheduler 表⽰背后驱动反应式流的调度器,通常由各种线程池实现。
Web Flux
Spring 5 引⼊的⼀个基于 Netty ⽽不是 Servlet 的⾼性能的 Web 框架,但是使⽤⽅式并没有同传统的基于 Servlet 的 Spring MVC 有什么⼤的不同。
▼ Web Flux 中 MVC 接⼝的⽰例
1@RequestMapping("/demo")
2@RestController
3public class DemoController {
4 @RequestMapping(value = "/foobar")
5 public Mono<Foobar> foobar() {
6 return Mono.just(new Foobar());
7 }
8}
最⼤的变化就是返回值从 Foobar 所表⽰的⼀个对象变为 Mono<Foobar> (或 Flux<T>)
当然,实际的程序并不会像⽰例那样就⼀⾏代码。关于如何开发实际的应⽤,这些正是后⾯介绍 Reactor 的部分所要详细叙述的。
Reactive Streams、Reactor 和 Web Flux
上⾯介绍了反应式编程的⼀些概念,以及 Reactor 和 Web Flux。可能读者看到这⾥有些乱。这⾥介绍⼀下三者的关系。其实很简单:Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。
其实,对于⼤部分业务开发⼈员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接⼝,对应到 Reactor 便是 Mono 和Flux。对于 Subscriber 和 Subcription 这两个接⼝,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架⽤到的。如果不开发中间件,通常开发⼈员是不会接触到的。
⽐如,在 Web Flux,你的⽅法只需返回 Mono 或 Flux 即可。你的代码基本也只和 Mono 或 Flux 打交道。⽽ Web Flux 则会实现Subscriber ,onNext 时将业务开发⼈员编写的 Mono 或 Flux 转换为 HTTP Response 返回给客户端。
三、Reactor ⼊门
接下来介绍⼀下 Reactor 中 Mono 和 Flux 这两个类中的主要⽅法的使⽤。
如同 Java 8 所引⼊的 Stream ⼀样,Reactor 的使⽤⽅式基本上也是分三步:开始阶段的创建、中间阶段的处理和最终阶段的消费。只不过创建和消费可能是通过像 Spring 5 这样框架完成的(⽐如通过 Web Flux 中的 WebClient 调⽤ HTTP 接⼝,返回值便是⼀个Mono)。但我们还是需要基本了解这些阶段的开发⽅式。
1. 创建 Mono 和 Flux(开始阶段)
使⽤ Reactor 编程的开始必然是先创建出 Mono 或 Flux。有些时候不需要我们⾃⼰创建,⽽是实现例如 WebFlux 中的 WebClient 或Spring Data Reactive 得到⼀个 Mono 或 Flux。
▼ 使⽤ WebFlux WebClient 调⽤ HTTP 接⼝
1WebClient webClient = ate("localhost:8080");
python新手代码userid2
3public Mono<User> findById(Long userId) {
4 return webClient
5 .get()
6 .uri("/users/" + userId)
7 .accept(MediaType.APPLICATION_JSON)
8 .exchange()
9 .flatMap(cr -> cr.bodyToMono(User.class));
10}
▼ 使⽤ ReactiveMongoRepository 查询 User
1public interface UserRepository extends ReactiveMongoRepository<User, Long> {
2 Mono<User> findByUsername(String username);
3}
但有些时候,我们也需要主动地创建⼀个 Mono 或 Flux。
“普通”的创建⽅式
简单的创建⽅式是主要是使⽤像 just 这样的⽅法创建
1Mono<String> helloWorld = Mono.just("Hello World");
2Flux<String> fewWords = Flux.just("Hello", "World");
3Flux<String> manyWords = Flux.fromIterable(words);
这样的创建⽅式在什么时候⽤呢?⼀般是⽤在当你在经过⼀系列⾮ IO 型的操作之后,得到了⼀个对象。接下来要基于这个对象运⽤Reactor 进⾏⾼性能的 IO 操作时,可以⽤这种⽅式将你之前得到的对象转换为 Mono 或 Flux。
“⽂艺”的创建⽅式
上述是我们通过⼀个同步调⽤得到的结果创建出 Mono 或 Flux,但有时我们需要从⼀个⾮ Reactive 的异步调⽤的结果创建出 Mono 或Flux。那如何实现呢。
如果这个异步⽅法返回⼀个 CompletableFuture,那我们可以基于这个 CompletableFuture 创建⼀个 Mono:
Mono.fromFuture(aCompletableFuture);
如果这个异步调⽤不会返回 CompletableFuture,是有⾃⼰的回调⽅法,那怎么创建 Mono 呢?我们可以使⽤ static <T> Mono<T>
create(Consumer<MonoSink<T>> callback) ⽅法:
2 ListenableFuture<ResponseEntity<String>> entity = ForEntity(url, String.class);
3
4 entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
5 @Override
6 public void onFailure(Throwable ex) {
7 (ex);
8 }
9
10 @Override
11 public void onSuccess(ResponseEntity<String> result) {
12 sink.Body());
13 }
14 });
15});
在使⽤ WebFlux 之后,AsyncRestTemplate 已经不推荐使⽤,这⾥只是做演⽰。
2. 处理 Mono 和 Flux(中间阶段)
中间阶段的 Mono 和 Flux 的⽅法主要有 filter、map、flatMap、then、zip、reduce 等。这些⽅法使⽤
⽅法和 Stream 中的⽅法类似。对于这些⽅法的介绍,将会放在下⼀节“Reactor 进阶”中,主要介绍这些⽅法不容易理解和使⽤容易出问题的点。
3. 消费 Mono 和 Flux(结束阶段)
直接消费的 Mono 或 Flux 的⽅式就是调⽤ subscribe ⽅法。如果在 Web Flux 接⼝中开发,直接返回 Mono 或 Flux 即可。Web Flux 框架会为我们完成最后的 Response 输出⼯作。
四、Reactor 进阶
接下来我将介绍⼀下我在使⽤ Reactor 开发实际项⽬时遇到的⼀些稍显复杂的问题,以及解决⽅法。
问题⼀:map、flatMap 和 then 分别在什么时候使⽤?
本段内容将涉及到如下类和⽅法:
⽅法:Mono.map
⽅法:Mono.flatMap
⽅法:Mono.then
类:Function
在 Mono 和 Flux 中间环节处理的处理过程中,有三个有些类似的⽅法:map、flatMap 和 then。这三个⽅法可以说是 Reactor 中使⽤频率很⾼的⽅法。
▼ 传统的命令式编程
1Object result1 = doStep1(params);
2Object result2 = doStep2(result1);
3Object result3 = doStep3(result2);
▼ 对应的反应式编程
1Mono.just(params)
2 .flatMap(v -> doStep1(v))
3 .flatMap(v -> doStep2(v))
4 .flatMap(v -> doStep3(v));
从上⾯两段代码的对⽐就很容易看出来 flatMap ⽅法在其中起到的作⽤,map 和 then ⽅法也有类似的作⽤。但这些⽅法之间的区别是什么呢?我们先来看看这三个⽅法的签名(以 Mono 为例):
flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
map(Function<? super T, ? extends R> mapper)
then(Mono<V> other)
可见,最复杂的是 flatMap ⽅法,map 次之,then 最简单。从⽅法名字上看,flatMap 和 map 都是做映射之⽤。⽽ then 则是下⼀步的意思,最适合⽤于链式调⽤,但为什么上⾯的例⼦使⽤的是 flatMap ⽽不是 then?
then 表⾯看上去是下⼀步的意思,但它只表⽰执⾏顺序的下⼀步,不表⽰下⼀步依赖于上⼀步。这个语义同 ES6 Promise 中的 then ⽅法是不同的。从 then ⽅法的参数只是⼀个 Mono,⽆从接受上⼀步的执⾏结果。⽽ flatMap 和 map 的参数都是⼀个 Function。⼊参是上⼀步的执⾏结果。
⽽ flatMap 和 map 的区别在于,flatMap 中的⼊参 Function 的返回值要求是⼀个 Mono(不明⽩的复习⼀下 Function 接⼝的定义),⽽map 的⼊参 Function 只要求返回⼀个普通对象。因为我们在业务处理中常需要调⽤ WebClient 或 ReactiveXxxRepository 中的⽅法,这些⽅法的返回值都是 Mono(
或 Flux)。所以要将这些调⽤串联为⼀个整体链式调⽤,就必须使⽤ flatMap,⽽不是 map。
所以,我们要正确理解 flatMap、map 和 then 这三个⽅法的⽤法和背后的含义,这样才能正确实践反应式编程。
问题⼆:如何实现并发执⾏
本段内容将涉及到如下类和⽅法:
⽅法:Mono.zip
类:Tuple2
类:BiFunction
并发执⾏是常见的⼀个需求。Reactive Programming 虽然是⼀种异步编程⽅式,但是异步不代表就是并发并⾏的。
在传统的命令式开发⽅式中,并发执⾏是通过线程池加 Future 的⽅式实现的。
1Future<Result1> result1Future = doStep1(params);
2Future<Result2> result2Future = doStep2(params);
3Result1 result1 = ();
4Result2 result2 = ();
5// Do merge;
6return mergeResult;
因为上⾯的代码虽然有⼀些异步效果在⾥⾯,但 () ⽅法是阻塞的。所以,当我们使⽤ Reactor 开发有并发执⾏场景的反应式代码时,肯定不能⽤上⾯的⽅式。这时,需要使⽤到 Mono 和 Flux 中的 zip ⽅法。这⾥我们以 Mono 为例演⽰。代码如下:
1Mono<CustomType1> item1Mono = ...;
2Mono<CustomType2> item2Mono = ...;
3Mono.zip(items -> {
4 CustomType1 item1 = CustomType1.class.cast(items[0]);
5 CustomType2 item2 = CustomType2.class.cast(items[1]);
6 // Do merge
7 return mergeResult;
8}, item1Mono, item2Mono);
上述代码中,产⽣ item1Mono 和 item2Mono 的过程是并⾏的。⽐如,调⽤⼀个 HTTP 接⼝的同时,执⾏⼀个数据库查询操作。这样就可以加快程序的执⾏。
但上述代码存在⼀个问题,就是 zip ⽅法需要做强制类型转换。⽽强制类型转换是不安全的。所以我们需要更优雅的⽅式。
好在 zip ⽅法存在多种重载形式。除了最基本的形式以外,还有多种类型安全的形式:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论