WebFlux响应式编程简单⽰例
WebFlux介绍
WebFlux是⼀个异步⾮阻塞框架
什么是异步⾮阻塞
同步和异步
针对的是调⽤者,调⽤者发出请求,如果等着对⽅回应之后才去做其它的事情,那就是同步;如果发送请求之后不等着对⽅回应就去做其它的事情,那就是异步。
阻塞与⾮阻塞
针对被调⽤者⽽⾔,如果收到请求之后,做完请求任务之后才反馈就是阻塞;如果收到请求之后就马上反馈,然后再去做事情,就是⾮阻塞
WebFlux的特点
⾮阻塞式:在有限的资源下,提⾼系统的吞吐量和伸缩性,以Reactor为基础,实现响应式编程
函数式编程
Reactor响应式编程
(1) 在响应式编程中,Reactor是满⾜Reactive规范的框架
(2) Reactor有两个核⼼类,Mono和Flux ,这两个类实现了接⼝Publisher, 提供了丰富的操作符。 Flux对象实现发布者,返回N个元素;Mono 实现翻版发布,返回0或者1 个元素。
(3) Flux和Mono都是数据流的发布者,使⽤Flux和Mono都可以发出三种信号:元素值,错误信号,完成信号。其中错误信息和完成信号都是终⽌信号。终⽌信号⽤于告诉订阅者数据流已经结束了,错误信号终⽌数据流时,还会将错误信息返回给订阅者。
3种信号的特点
1. 错误信号和完成信号都是终⽌信号,不能并存
2. 如果没有发送任何元素值,⽽是直接发送错误或者完成信号,表⽰是⼀个空的数据流。
3. 如果没有错误信号也没有完成信号,表⽰⽆限数据流。
操作符
对我们的数据流进⾏⼀道道的操作,就是操作符
1. map : 把元素映射成⼀个新的元素
2. flatmap :把每个元素转成流,把多个流合并成⼀个⼤的流,然后进⾏输出
代码⽰例
publisher.Flux;
publisher.Mono;
/**
* @Author ZhengQinfeng
* @Date 2020/12/5 21:04
* @dec
*/
public class TestReactor {
public static void main(String[] args) {
// just⽅法中直接声明数据流,⽽且这个数据流还没有发出。 subscribe是订阅数据流,订阅之后数据流才会发出
Flux.just(1, 2, 3, 4).subscribe(System.out::println);
Mono.just(1).subscribe(System.out::println);
// 其它⽅法
// Integer[] arr = {1, 2, 3, 4};
// Flux.fromArray(arr);
//
// List<Integer> list = Arrays.asList(arr);
// Flux.fromIterable(list);
//
// Stream<Integer> stream = list.stream();
// Flux.fromStream(stream);
//
//
// // 发送⼀个错误信号
// (new RuntimeException());
}
}
SpringWebFlux
SpringWebFlux基于Reactor,默认使⽤的容器是Netty , Netty是⾼性能的IO框架,异步⾮阻塞框架SpringWebFlux执⾏过程
核⼼控制器DispatchHandler, 实现了WebHandler
两个接⼝
1. RouterFunction
路由请求
2. HandlerFunction
处理的具体的函数
编程的两种⽅式
基于注解编程
与springMmc使⽤相似,只需要把相关依赖引⽤⼊项⽬中即可,springboot⾃动配置相关的运⾏容器,默认使⽤Netty服务器1. 引⼊依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
2. 代码⽰例
/**
* @Author ZhengQinfeng
* @Date 2020/12/5 21:47
* @dec UserService 操作,定义⼀些操作⽅法
*/
public interface UserService {
/
/ 根据id查询⽤户
// Mono表⽰返回⼀个或0个元素
Mono<User> getUserById(Integer id);
// 查询所有⽤户
Flux<User> getAllUser();
// 添加⽤户
Mono<Void> saveUserInfo(Mono<User> user);
}
@Service
public class UserServiceImpl implements UserService {
// 创建⼀个Map集合,存储数据
private final Map<Integer, User> users = new HashMap<>();
public UserServiceImpl() {
this.users.put(1, new User(1, "admin", "男", 20));
this.users.put(2, new User(2, "jack", "⼥", 20));
}
@Override
public Mono<User> getUserById(Integer id) {
return Mono.justOrEmpty((id));
}
@Override
public Flux<User> getAllUser() {
return Flux.fromIterable(this.users.values());
}
@Override
public Mono<Void> saveUserInfo(Mono<User> userMono) {
return userMono.doOnNext(user -> {
// 放到Map
this.users.Id(), user);
}).pty());// 终⽌信号
}
}
/**
* @Author ZhengQinfeng
* @Date 2020/12/5 21:58
* @dec
*/
@RestController
public class UserController {
@Autowired
private UserService userService;
@GetMapping("/user/{id}")
public Mono<User> getUserById(@PathVariable Integer id) {
UserById(id);
}
@GetMapping("/users")
public Flux<User> getAllUsers() {
AllUser();
}
@PostMapping("/save/user")
public Mono<Void> getAllUsers(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
return userService.saveUserInfo(userMono);
}
}
基于函数式编程
在使⽤函数式编程时,需要我们⾃⼰初始化服务器
两个核⼼接⼝,RouterFunction(实现路由功能,将请求转发给对应的handler)和HandlerFunction(处理请求⽣成响应的函数). 核⼼任务定义这两个接⼝的实现并且启动需要的服务器
SpringWebFlux请求和响应不再是ServetRequest和ServetResponse, ⽽是ServerRequest 和 ServerResponse
代码⽰例
Service层代码和之前⼀样
不需要Controller层代码,使⽤Handler替换之前的Controller
import org.springframework.http.MediaType;
import org.active.function.server.ServerRequest;
import org.active.function.server.ServerResponse;
import qinfeng.ity.User;
import qinfeng.zheng.springwebflux.service.UserService;
publisher.Flux;
publisher.Mono;
import static org.active.function.BodyInserters.fromValue;
/**
* @Author ZhengQinfeng
* @Date 2020/12/5 22:38
* @dec 函数式编程模型
*/
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService) {
this.userService = userService;
}
// 根据id查询
public Mono<ServerResponse> getUserById(ServerRequest request) {
Integer id = Integer.valueOf(request.pathVariable("id"));
// ⾮空处理
Mono<ServerResponse> notFound = Found().build();
Mono<User> userMono = UserById(id);
//把userMono 进⾏转换
/
/使⽤Reactor 操作符flatMap
return userMono
.flatMap(user -> ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromValue(user)))
reactive声明类型.switchIfEmpty(notFound);
}
// 查询所有
public Mono<ServerResponse> getAllUsers(ServerRequest request) {
Flux<User> users = AllUser();
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users, User.class);
}
// 添加User
public Mono<ServerResponse> saveUser(ServerRequest request) {
// 获取user 对象
Mono<User> userMono = request.bodyToMono(User.class);
return ServerResponse.ok().build(this.userService.saveUserInfo(userMono));
}
}
服务端代码
import org.springframework.http.MediaType;
import org.springframework.active.HttpHandler;
import org.springframework.active.ReactorHttpHandlerAdapter;
import org.active.function.server.RequestPredicates;
import org.active.function.server.RouterFunction;
import org.active.function.server.RouterFunctions;
import org.active.function.server.ServerResponse;
import qinfeng.zheng.springwebflux.handler.UserHandler;
import qinfeng.zheng.springwebflux.service.UserService;
import qinfeng.zheng.springwebflux.service.impl.UserServiceImpl;
import reactorty.http.server.HttpServer;
import static org.active.function.HttpHandler;
/**
* @Author ZhengQinfeng
* @Date 2020/12/5 22:56
* @dec
*/
public class Server {
public static void main(String[] args) throws InterruptedException {
Server server = new Server();
Thread.sleep(Integer.MAX_VALUE); // 主进程⼀直运⾏
}
// 1. 创建Router路由
public RouterFunction<ServerResponse> routingFunction() {
UserService userService = new UserServiceImpl();
UserHandler userHandler = new UserHandler(userService);
// 创建路由与 handler⽅法的映射关系
return RouterFunctions
.route(
RequestPredicates.GET("/user/{id}").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::getUserById) .andRoute(
RequestPredicates.GET("/users").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::getAllUsers)
.andRoute(
RequestPredicates.POST("/save/user").and(RequestPredicates.accept(MediaType.APPLICATION_JSON)), userHandler::saveUser) ;
}
/
/2. 创建服务器,完成适配
public void createReactorServer() {
// 路由与handler适配
RouterFunction<ServerResponse> router = routingFunction();
HttpHandler httpHandler = toHttpHandler(router);
ReactorHttpHandlerAdapter handlerAdapter = new ReactorHttpHandlerAdapter(httpHandler);
// 创建服务器
HttpServer httpServer = ate();
httpServer.port(9999); // 端⼝没⽣效,不知咋回事, 启动时会随机⽣效⼀个端⼝ httpServer.handle(handlerAdapter).bindNow(); // ⽴即启动服务器
}
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论