websocket替代⽅案_分布式WebSocket集解决⽅案
问题起因
最近做项⽬时遇到了需要多⽤户之间通信的问题,涉及到了WebSocket握⼿请求,以及集中WebSocket Session共享的问题。
期间我经过了⼏天的研究,总结出了⼏个实现分布式WebSocket集的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇⽂章,希望能帮助到某些⼈,并且能⼀起分享这⽅⾯的想法与研究。
以下是我的场景描述
资源:4台服务器。其中只有⼀台服务器具备ssl认证域名,⼀台redis+mysql服务器,两台应⽤服务器(集)
需求:⽤户登录应⽤,需要与服务器建⽴wss连接,不同⾓⾊之间可以单发消息,也可以发消息
集中的应⽤服务类型:每个集实例都负责http⽆状态请求服务与ws长连接服务
系统架构图
在我的实现⾥,每个应⽤服务器都负责http and ws请求,其实也可以将ws请求建⽴的聊天模型单独成⽴为⼀个模块。从分布式的⾓度来看,这两种实现类型差不多,但从实现⽅便性来说,⼀个应⽤服务http+ws请求的⽅式更为⽅便。下⽂会有解释
本⽂涉及的技术栈
Eureka 服务发现与注册
Redis Session共享
Redis 消息订阅
Spring Boot
Zuul ⽹关
Spring Cloud Gateway ⽹关
Spring WebSocket 处理长连接
Ribbon 负载均衡
Netty 多协议NIO⽹络通信框架
Consistent Hash ⼀致性哈希算法
相信能⾛到这⼀步的⼈都了解过我上⾯列举的技术栈了,如果还没有,可以先去⽹上⼊门教程了解⼀下。下⾯的内容都与上述技术相关,题主默认⼤家都了解过了...
这⾥是描述⼀致性Hash算法最易懂的⽂章传送门
技术可⾏性分析
下⾯我将描述session特性,以及根据这些特性列举出n个解决分布式架构中处理ws请求的集⽅案
WebSocketSession与HttpSession
在Spring所集成的WebSocket⾥⾯,每个ws连接都有⼀个对应的session:WebSocketSession,在Spring WebSocket中,我们建⽴ws连接之后可以通过类似这样的⽅式进⾏与客户端的通信:
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
System.out.println("服务器接收到的消息: "+ message );
//send message to client
session.sendMessage(new TextMessage("message"));
}
那么问题来了:ws的session⽆法序列化到redis,因此在集中,我们⽆法将所有WebSocketSession都缓存到redis进⾏session共享。每台服务器都有各⾃的session。于此相反的是HttpSession,redis可以⽀持httpsession共享,但是⽬前没有websocket session共享的⽅案,因此⾛redis websocket session共享这条路是⾏不通的。
有的⼈可能会想:我可不可以将sessin关键信息缓存到redis,集中的服务器从redis拿取session关键信息然后重新构建我只想说这种⽅法如果有⼈能试出来,请告诉我⼀声...
以上便是websocket session与http session共享的区别,总的来说就是http session共享已经有解决⽅案了,⽽且很简单,只要引⼊相关依赖:spring-session-data-redis和spring-boot-starter-redis,⼤家可以从⽹上个demo玩⼀下就知道怎么做了。⽽websocket session共享的⽅案由于websocket底层实现的⽅式,我们⽆法做到真正的websocket session共享。
解决⽅案的演变
Netty与Spring WebSocket
刚开始的时候,我尝试着⽤netty实现了websocket服务端的搭建。在netty⾥⾯,并没有websocket session这样的概念,与其类似的是channel,每⼀个客户端连接都代表⼀个channel。前端的ws请求通过netty监听的端⼝,⾛websocket协议进⾏ws握⼿连接之后,通过⼀些列的handler(责链模式)进⾏消息处理。与websocket session类似地,服务端在连接建⽴后有⼀个channel,我们可以通过channel进⾏与客户端的通信
/**
* TODO 根据服务器传进来的id,分配到不同的group
*/
private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//retain增加引⽤计数,防⽌接下来的调⽤引⽤失效
System.out.println("服务器接收到来⾃ " + ctx.channel().id() + " 的消息: " + ());
//将消息发送给group⾥⾯的所有channel,也就是发送消息给客户端
GROUP.ain());
}
那么,服务端⽤netty还是⽤spring websocket?以下我将从⼏个⽅⾯列举这两种实现⽅式的优缺点
使⽤netty实现websocket
玩过netty的⼈都知道netty是的线程模型是nio模型,并发量⾮常⾼,spring5之前的⽹络线程模型是servlet实现的,⽽servlet不是nio模型,所以在spring5之后,spring的底层⽹络实现采⽤了netty。如果我们单独使⽤netty来开发websocket服务端,速度快是绝对的,但是可能会遇到下列问题:
1.与系统的其他应⽤集成不⽅便,在rpc调⽤的时候,⽆法享受springcloud⾥feign服务调⽤的便利性
2.业务逻辑可能要重复实现
3.使⽤netty可能需要重复造轮⼦
4.怎么连接上服务注册中⼼,也是⼀件⿇烦的事情
使⽤spring websocket实现ws服务
spring websocket已经被springboot很好地集成了,所以在springboot上开发ws服务⾮常⽅便,做法⾮常简单第⼀步:添加依赖
org.springframework.boot
spring-boot-starter-websocket
第⼆步:添加配置类
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myHandler(), "/")
.setAllowedOrigins("*");
}
@Bean
public WebSocketHandler myHandler() {
return new MessageHandler();
}
}
第三步:实现消息监听类
@Component
@SuppressWarnings("unchecked")
public class MessageHandler extends TextWebSocketHandler {
private List clients = new ArrayList<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
clients.add(session);
System.out.println("uri :" + Uri());
System.out.println("连接建⽴: " + Id());
System.out.println("current seesion: " + clients.size());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
System.out.println("断开连接: " + Id());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
String payload = Payload();
Map map = JSONObject.parseObject(payload, HashMap.class);
System.out.println("接受到的数据" + map);
clients.forEach(s -> {
try {
System.out.println("发送消息给: " + Id());
s.sendMessage(new TextMessage("服务器返回收到的信息," + payload));
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
前端websocket怎么用从这个demo中,使⽤spring websocket实现ws服务的便利性⼤家可想⽽知了。为了能更好地向spring cloud⼤家族看齐,我最终采⽤了spring websocket实现ws服务。
因此我的应⽤服务架构是这样⼦的:⼀个应⽤既负责restful服务,也负责ws服务。没有将ws服务模块拆分是因为拆分出去要使⽤feign来进⾏服务调⽤。第⼀本⼈⽐较懒惰,第⼆拆分与不拆分相差在多了⼀层服务间的io调⽤,所以就没有这么做了。
从zuul技术转型到spring cloud gateway
要实现websocket集,我们必不可免地得从zuul转型到spring cloud gateway。原因如下:
zuul1.0版本不⽀持websocket转发,zuul 2.0开始⽀持websocket,zuul2.0⼏个⽉前开源了,但是2.0版本没有被spring boot集成,⽽且⽂档不健全。因此转型是必须的,同时转型也很容易实现。
在gateway中,为了实现ssl认证和动态路由负载均衡,yml⽂件中以下的某些配置是必须的,在这⾥提前避免⼤家采坑
server:
port: 443
ssl:
enabled: true
key-store: classpath:xxx.jks
key-store-password: xxxx
key-store-type: JKS
key-alias: alias
spring:
application:
name: api-gateway
cloud:
gateway:
httpclient:
ssl:
handshake-timeout-millis: 10000
close-notify-flush-timeout-millis: 3000
close-notify-read-timeout-millis: 0
useInsecureTrustManager: true
discovery:
locator:
enabled: true
lower-case-service-id: true
routes:
- id: dc
uri: lb://dc
predicates:
- Path=/dc/**
- id: wecheck
uri: lb://wecheck
predicates:
- Path=/wecheck/**
如果要愉快地玩https卸载,我们还需要配置⼀个filter,否则请求⽹关时会出现错误not an SSL/TLS record @Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered {
private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;
@Override
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
URI originalUri = Request().getURI();
ServerHttpRequest request = Request();
ServerHttpRequest.Builder mutate = request.mutate();
String forwardedUri = URI().toString();
if (forwardedUri != null && forwardedUri.startsWith("https")) {
try {
URI mutatedUri = new URI("http",

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。