websocket多线程发送内容websocket多线程发送内容
1.websocketSession基类接⼝:
org.springframework.web.socket.WebSocketSession
其中包含 getPrincipal,getLocalAddress,getRemoteAddress,sendMessage,isOpen,close等⽅法定义
2.接⼝的实现类,实现了上⾯的接⼝,采⽤包装设计模式,只做转发没有处理逻辑,具体看下⾯这个类: org.springframework.web.socket.handler.WebSocketSessionDecorator
3.带并发的包装类:
ConcurrentWebSocketSessionDecorator
主要关注两个⽅法 sendMessage 和 close,
```
public void close(CloseStatus status) throws IOException {
this.closeLock.lock(); //获取关闭锁
try {
if (!this.closeInProgress) { //如果已经关闭了,则不处理
if (!CloseStatus.SESSION_NOT_RELIABLE.equals(status)) {
try {
this.checkSessionLimits(); //检查超时时间和buffer状态
} catch (SessionLimitExceededException var6) {
;
}
if (this.limitExceeded) {
if (logger.isDebugEnabled()) {
logger.debug("Changing close status " + status + " to SESSION_NOT_RELIABLE.");
}
status = CloseStatus.SESSION_NOT_RELIABLE;
}
}
this.closeInProgress = true; //设置关闭标志位,防⽌多次关闭
super.close(status); //调⽤关闭⽅法,传递关闭状态
return;
}
} finally {
this.closeLock.unlock();
}
}
```
close⽅法是⽤ volatile 修饰的标志位 closeInProgress和加显式锁的⽅式,防⽌多个线程调⽤close⽅法
4.sendMessage ⽅法
websocket和socketpublic void sendMessage(WebSocketMessage<?> message) throws IOException {
if (!this.shouldNotSend()) { //检查允许发送(是否是关闭状态?,是否是超过⼤⼩限制?)
this.buffer.add(message); //将消息添加到queue中
this.bufferSize.PayloadLength()); //将消息⼤⼩字段递增
do {
if (!FlushMessageBuffer()) { //尝试发消息,需要加锁,如果有没有获取到锁,则返回false
if (logger.isTraceEnabled()) {
}
this.checkSessionLimits(); //检查是否超时,检查⼤⼩是否超过限制
break;
}
} while(!this.buffer.isEmpty() && !this.shouldNotSend());
}
}
真实发送消息的⽅法,在while⽅法执⾏时,如果其他线程也要调⽤发送⽅法,则只对buffer⼤⼩进⾏统
计,不发送
private boolean tryFlushMessageBuffer() throws IOException {
if (!Lock()) { //没获取到发送锁,直接返回false
return false;
} else {
try {
while(true) { //循环遍历queue,知道⾥⾯取出来的值为null
WebSocketMessage<?> message = (WebSocketMessage)this.buffer.poll();
if (message == null || this.shouldNotSend()) {
return true;
}
this.bufferSize.addAndGet(-PayloadLength()); //减掉将要发送内容的⼤⼩
this.sendStartTime = System.currentTimeMillis(); //记录开始时间
this.sendStartTime = 0L; //发送完成,重置发送开始时间
}
} finally {
this.sendStartTime = 0L;
this.flushLock.unlock();
}
}
}
spring就是由上⾯的包装类实现了websocket session的多线程使⽤,可以将websocketSession保存到全局map中,随时调⽤
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论