关于WebSocket分布式实现的⼀种⽅案
WebSocket常⽤于做后台消息推送,也可以做简易的IM聊天,由于WebSocket中的Session没有实现序列化接⼝的,我们⽆法将session 序列化实现分布式部署,今天就来记录⼀种分布式的实现⽅案。
实现原理
⾸先我们讲的这种⽅式是利⽤redis订阅和发布模式来实现,⼤致过程:
每个服务器记录连接,保存在内存当中
当需要推送websocket消息的时候,同时在redis发布⼀个消息
每个服务器订阅redis的消息,当监听到有消息时,每台服务器遍历⾃⼰内存当中的连接进⾏发送
这样我们就可以实现websocket的分布式部署,当然redis订阅和发布也可以⽤其他消息队列⼯具类实现。
实现过程
这⾥并不打算贴所有代码,只记录关键的⼀些代码,其余的可以⽹上查看相关资料,基于SpringBoot2.1.8实现。
pom⽂件引⼊redis和websocket
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--websocket-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
写⼀个redis发布器
@Component
public class PublishService {
@Autowired
StringRedisTemplate redisTemplate;
/**
* 发布⽅法
*
* @param channel 消息发布订阅主题
* @param message 消息信息
*/
public void publish(String channel, Object message) {
}
}
写⼀个redis
public class SubscribeListener implements MessageListener {
/**
* 订阅接收发布者的消息
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String msg = new Body());
System.out.println(new String(pattern) + "接收消息:" + msg);
//遍历本地内存当中的websocket连接...
//拿到对应的websocket session就可以进⾏推送消息
}
}
websocket和socket
配置下websocket
@Configuration
public class WebSocketConfig extends ServerEndpointConfig.Configurator{
@Override
public void modifyHandshake(ServerEndpointConfig sec,
HandshakeRequest request, HandshakeResponse response) {
/
/ 主要为了能在websocket打开连接时获取httpsession和当前登陆⽤户,此处跟本⽂内容没有关系
HttpSession httpSession=(HttpSession) HttpSession();
//存⼊httpsession
//存⼊当前⽤户
}
/**
* ⾃动注册使⽤了@ServerEndpoint注解声明的Websocket endpoint
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
配置redis消息发布订阅
/**
* redis 消息监听⽤于websocket 分布式处理
* @param redisConnectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory){    RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
//设置订阅topic
redisMessageListenerContainer.addMessageListener(new SubscribeListener(), new ChannelTopic("socket_topic"));
return redisMessageListenerContainer;
}
再写⼀个简易的存储⼯具类,这个就基于ConcurrentHashMap就能实现,不记录了。
最后来看websocket ServerEndpoint ⽅法
@OnOpen
public void onOpen(Session session, EndpointConfig config) {
log.debug("websocket:打开连接");
//将连接存⼊我们的缓存⼯具,这个⼯具就是简单的存储,代码⾃⼰写⼀个
CacheSessionMap.put("可以是sessionId,也可以实当前⽤户ID",session);
}
@OnClose
public void onClose(Session session) {
log.debug("websocket:关闭连接");
//关闭的连接,我们将其移除
}
@OnMessage
public void onMessage(Session session, String message) throws IOException {    log.debug("websocket:消息来了");
//⽤我们之前写的redis消息发布器,将这个消息发布到redis
publishService.publish("socket_topic", message);
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。