使⽤Redis订阅+Websocket将消息推送给前端
前⾔
⾸先说⼀下业务,我们的webapi需要从redis订阅消息,并把订阅到的消息推送给web前端。要求不同的⽤户建⽴不同的websocket连接,并把websocket要把消息分发给不同的⽤户。
Redis的消息订阅与发布并不复杂,这⾥不再赘述。主要讲解如何通过webSocke将消息推送给前端。
我们使⽤的是 .Net Core 3.1 + 原⽣ webSocket ,前端使⽤的是VueJS。
在websocket中订阅redis消息,当收到订阅消息时,处理并发送给前端。
实践
1、配置websocket中间键
⾸先,安装 System.Net.WebSockets包,可在Nuget中安装,也可以在程序包管理控制台安装。
然后,在 Startup 类的 Configure ⽅法中添加 WebSocket 中间件: app.UseWebSockets();
可以配置以下属性:
KeepAliveInterval - 向客户端发送“ping”帧的频率,以确保代理保持连接处于打开状态。 ReceiveBufferSize - ⽤于接收数据的缓冲区的⼤⼩。 ⾼级⽤户可能需要对其进⾏更改,以便根据数据⼤⼩调整性能。
app.UseWebSockets(new WebSocketOptions()
{
KeepAliveInterval = TimeSpan.FromSeconds(120000),
ReceiveBufferSize = 4 * 1024
});
2、新建⼀个WebSocketsHelper类,代码如下:
public class WebSocketsHelper
{
/// <summary>
/// ⽇志接⼝
/// </summary>
private ILogger<WebSocketsHelper> _logger;
/// <summary>
/// 下⼀级管道
/// </summary>
private RequestDelegate _next;
/// <summary>
/// 缓冲区⼤⼩
/// </summary>
private const int bufferSize = 1024 * 4;
/
// <summary>
/// URL地址后缀
/// </summary>
private const string routePostfix = "/ws";
/// <summary>
/// Socket列表
/// Socket列表
/// </summary>
/// <typeparam name="string">typeof(string),⽤户名</typeparam>
/// <typeparam name="WebSocket">typeof(WebSocket),WebSocket</typeparam>
/// <returns></returns>
private static ConcurrentDictionary<string, WebSocket> _socketsList
= new ConcurrentDictionary<string, WebSocket>();
//private static ConcurrentDictionary<string, IList<MReadViewModel>> _meterList = new ConcurrentDictionary<string, IList<MReadViewModel>>();
private const string Channel_1 = "TestChannel1";
private const string Channel_2 = "TestChannel2";
private readonly RedisHelper _helper;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="next">下⼀级管道</param>
/// <param name="logger">⽇志接⼝</param>
public WebSocketsHelper(RequestDelegate next, ILogger<WebSocketsHelper> logger, RedisHelper helper)
{
_next = next;
_logger = logger;
_helper = helper;
}
public async Task Invoke(HttpContext context)
{
//判断当前请求是否为WebSocket
if (!IsWebSocket(context))
{
await _next.Invoke(context);
return;
}
//将请求转换为 WebSocket 连接
var webSocket = await context.WebSockets.AcceptWebSocketAsync();
while (webSocket.State == WebSocketState.Open)
{
//接收消息
var entity = await Receiveentity<SocketMessageEntity>(webSocket);
HandRecMsg(entity, webSocket);
//订阅redis消息
await _helper.UnsubscribeAsync(Channel_1 );
await _helper.SubscribeAsync(Channel_1 , async (channel, msg) => await HandleChannelMsg(webSocket, channel, msg));
//await _helper.UnsubscribeAsync(Channel_2);
//await _helper.SubscribeAsync(Channel_2, async (channel, msg) => await HandleChannelMsg(webSocket, channel, msg));
}
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", default(CancellationToken));
}
/// <summary>
///  处理低订阅到的信息,并发送给⽤户
/
// </summary>
/// <param name="webSocket"></param>
/// <param name="channelName"></param>
/// <param name="msg"></param>
/// <returns></returns>
private async Task HandleChannelMsg(WebSocket webSocket, string channelName, string msg)
{
_logger.LogError("收到订阅结果:" + msg);
MeterReadResult res = JsonConvert.DeserializeObject<MeterReadResult>(msg);
//消息类型
//消息类型
MReadViewModel mod = new MReadViewModel()
{
MeterAddr = res.Meter,
readTime = DateTime.Now
};
mod.Flag = res.Status;
SocketMessageEntity entity = new SocketMessageEntity()
{
Receiver = res.UserId,
Sender = "Amr-Api",
Message = mod
};
await HandleSend(webSocket, entity);
}
/// <summary>
/// 处理Socket收到的信息
/// </summary>
/// <param name="msg"></param>
/// <param name="webSocket"></param>
private void HandRecMsg(SocketMessageEntity msg, WebSocket webSocket)
{
_logger.LogError("收到socket消息:" + JsonConvert.SerializeObject(msg));
if (msg != null && msg.Receiver != "" && msg.Receiver != null)
{
AddUser(msg.Receiver, webSocket);
}
else
{
//_helper.Unsubscribe(Channel_2);
_helper.Unsubscribe(Channel_1);
//webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", default(CancellationToken));            }
}
private bool RemoveUser(string UserId, WebSocket webSocket)
{
WebSocket socket1 = null;
_socketsList.TryRemove(UserId, out webSocket);
webSocket = null;
return true;
}
/// <summary>
/// 添加⽤户
/// </summary>
/// <param name="UserId"></param>
/// <param name="webSocket"></param>
/
// <returns></returns>
private bool AddUser(string UserId, WebSocket webSocket)
{
var res = false;
//不存在
if (!_socketsList.TryGetValue(UserId, out WebSocket socket))
{
res = _socketsList.TryAdd(UserId, webSocket);
}
else
{
/
/webSocket = null;
WebSocket socket1 = null;
_socketsList.TryRemove(UserId, out socket1);
res = _socketsList.TryAdd(UserId, webSocket);
res = _socketsList.TryAdd(UserId, webSocket);
}
return res;
}
/// <summary>
/// 处理WebSocket聊天
/// </summary>
/
// <param name="webSocket">WebSocket</param>
/// <param name="entity">Entity of Message</param>
/// <returns></returns>
private async Task HandleSend(WebSocket webSocket, SocketMessageEntity entity)
{
_logger.LogError(entity.Sender + ";" + entity.Receiver + ";" + entity.Message);
await SendOne(entity.Sender, entity.Receiver, entity.Message);
}
/// <summary>
/// 给指定⽤户发送消息
/// </summary>
/
// <param name="sender">发送者</param>
/// <param name="receiver">接收者</param>
/// <param name="message">消息内容</param>
/// <returns></returns>
private async Task SendOne(string sender, string receiver, MReadViewModel message, MessageType type = MessageType.Chat)        {
if (sender == receiver) return;
if (message == null) return;
if (!ValidateUser(receiver)) return;
var socket = _socketsList[receiver];
var chatEntity = new SocketMessageEntity() { Receiver = receiver, Sender = sender, Message = message };
await SendMessage(socket, chatEntity);
}
/// <summary>
/// 当前请求是否为WebSocket
/// </summary>
/// <param name="context">Http上下⽂</param>
/// <returns></returns>
private bool IsWebSocket(HttpContext context)
{
return context.WebSockets.IsWebSocketRequest &&
context.Request.Path == routePostfix;
}
/// <summary>
/// 验证⽤户是否存在
/// </summary>
/// <param name="userName">⽤户名</param>
/// <returns></returns>
private bool ValidateUser(string userName)
{
return _socketsList.ContainsKey(userName);
}
/// <summary>
/
// 发送消息
/// </summary>
/// <typeparam name="TEntity"></typeparam>
/// <param name="webSocket"></param>
/// <param name="entity"></param>
/// <returns></returns>
private async Task SendMessage<TEntity>(WebSocket webSocket, TEntity entity)
{
{
try
{
if (webSocket.State == WebSocketState.Open)
{
var Json = JsonConvert.SerializeObject(entity);
_logger.LogError("Json数据:" + Json);
var bytes = Encoding.UTF8.GetBytes(Json);
await webSocket.SendAsync(
new ArraySegment<byte>(bytes),
WebSocketMessageType.Text,
true,
CancellationToken.None
);
}
}
catch
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", default(CancellationToken));            }
}
/// <summary>
/// 接收消息
/// </summary>
/// <param name="webSocket">WebSocket</param>
/// <typeparam name="TEntity">typeof(TEntity)</typeparam>
/// <returns></returns>
private async Task<TEntity> Receiveentity<TEntity>(WebSocket webSocket)
{
try
{
if (webSocket.State == WebSocketState.Open)
{
var buffer = new ArraySegment<byte>(new byte[bufferSize]);
var result = await webSocket.ReceiveAsync(buffer, CancellationToken.None);
前端websocket怎么用
while (!result.EndOfMessage)
{
result = await webSocket.ReceiveAsync(buffer, default(CancellationToken));
}
var json = Encoding.UTF8.GetString(buffer.Array);
json = json.Replace("\0", "").Trim();
return JsonConvert.DeserializeObject<TEntity>(json, new JsonSerializerSettings()
{
DateTimeZoneHandling = DateTimeZoneHandling.Local
});
}
else
{
return default;
}
}
catch
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Close", default(CancellationToken));                return default;
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。