SpringBoot集成websocket发送后台⽇志到前台页⾯业务需求
后台为⼀个采集系统,需要将采集过程中产⽣的⽇志实时发送到前台页⾯展⽰,以便了解采集过程。
技能点
SpringBoot 2.x
websocket
logback
thymeleaf
RabbitMQ
之所以使⽤到RabbitMQ是因为实际环境中采集服务为多个,为了统⼀处理⽇志信息,将⽇志都先灌⼊mq中,再统⼀从mq中进⾏消费
引⼊关键pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!--rabbitmq -->
springboot aop<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
logback配置⽂件引⼊AmqpAppender
<springProperty scope="context" name="rabbitmq-address" source="spring.rabbitmq.addresses" defaultValue="127.0.0.1:5672" />
<springProperty scope="context" name="rabbitmq-username" source="spring.rabbitmq.username" defaultValue="guest" />
<springProperty scope="context" name="rabbitmq-password" source="spring.rabbitmq.password" defaultValue="guest" />
<springProperty scope="context" name="rabbitmq-virtual-host" source="spring.rabbitmq.virtual-host" defaultValue="/" />
<springProperty scope="context" name="exhcange-name" source="hcangeName" defaultValue="default-exchange" />
<springProperty scope="context" name="binding-key" source="platform.parameter.bindingKey" defaultValue="default-routing" />
<appender name="RabbitMq" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
<layout>
<pattern>[%X{traceId}] - %d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern> <!--<1>-->
</layout>
<!--rabbitmq地址 -->
<addresses>${rabbitmq-address}</addresses>
<username>${rabbitmq-username}</username>
<password>${rabbitmq-password}</password>
<virtualHost>${rabbitmq-username}</virtualHost>
<declareExchange>false</declareExchange>
<exchangeType>direct</exchangeType>
<exchangeName>${exhcange-name}</exchangeName>
<routingKeyPattern>${binding-key}</routingKeyPattern>
<generateId>true</generateId>
<charset>UTF-8</charset>
<durable>true</durable>
<deliveryMode>NON_PERSISTENT</deliveryMode>
<filter class="com.log.websocket.stomp.LogFilter">
<level>INFO</level>
</filter>
</appender>
<springProfile name="dev">
<root level="debug">
<appender-ref ref="RabbitMq" />
</root>
</springProfile>
⽇志过滤器
logback配置⽂件中添加的AmqpAppender使⽤了filter,具体的filter如下所⽰:
public class LogFilter extends AbstractMatcherFilter<ILoggingEvent> {
Level level;
@Override
public FilterReply decide(ILoggingEvent event) {
if (!isStarted()) {
return FilterReply.NEUTRAL;
}
//过滤指定级别的⽇志
Level().equals(level)){
Map<String, String> mdcMap = MDCPropertyMap();
String tracId = ("traceId");
//过滤⽇志中带有traceId的⽇志,其他的不需要,traceId使⽤aop添加
if(StringUtils.isNotBlank(tracId)){
return FilterReply.ACCEPT;
}
}
return FilterReply.DENY;
}
public void setLevel(Level level) {
this.level = level;
}
@Override
public void start() {
if (this.level != null) {
super.start();
}
}
}
说明:
AmqpAppender中的filter设置了过滤级别,因此只过滤指定级别的⽇志;
过滤⽇志中带有traceId的⽇志,traceId通过aop添加,具体参考后⾯的aop设置;
aop⽅式添加traceId
编写LogAspect如下所⽰:
@Order(1)
@Aspect
@Component
public class LogAspect {
/**
* 所有的业务类的类名都是xxSpiderxxImpl,统⼀⼊⼝都是gatherData⽅法
*/
@Pointcut("execution(* com.log..*.service..*Spider*Impl.gatherData(..))")
public void pointCut() {}
@Before("pointCut()")
public void before(JoinPoint joinPoint){
//切点已经确定是com.log..*.service..*Spider*Impl.gatherData(..),该⽅法的参数只有⼀个,且为GatherTaskVO
GatherTaskVO vo = (Args()[0];
//将任务id作为traceId
MDC.put("traceId", vo.getId());
}
@After("pointCut()")
public void after(JoinPoint joinPoint){
//⽅法执⾏完成以后,删除traceId
}
}
解释⼀下MDC:
对于多个线程同时执⾏的系统或者分布式系统中,各个线程的⽇志穿插执⾏,导致我们⽆法直观的直接定位整个操作流程,因此,我们需要对⼀个线程的操作流程进⾏归类标记,⽐如使⽤线程+时间戳或者⽤户id等,从⽽使我们能够从混乱的⽇志中梳理处整个线程的操作流程,因此Slf4j的MDC应运⽽⽣,
logback和log4j⽀持MDC。
MDC中提供的⽅法如下所⽰;
package org.jboss.logging;
import java.util.Collections;
import java.util.Map;
/**
* 删除了⾮必须代码以及注释
* Mapped diagnostic context. Each log provider implementation may behave different.
*/
public final class MDC {
//uts the value onto the context.
public static Object put(String key, Object val);
//Returns the value for the key or {@code null} if no value was found.
public static Object get(String key);
//Removes the value from the context.
public static void remove(String key);
//Clears the message diagnostics context.
public static void clear();
}
MDC提供的⽅法⽐较简单,使⽤也很简单,只需要将指定的值put到线程上下⽂中,在对应的地⽅调⽤get⽅法获取到值即可。
注意看上述AmqpAppender配置中标记<1>中的traceId即为我们此处添加到线程上下⽂中的值,如下所⽰
<layout>
<pattern>[%X{traceId}] - %d{HH:mm:ss.SSS} %-5level %logger{36} - %msg%n</pattern>
</layout>
开启websocket⽀持
Springboot环境下注⼊ServerEndpointExporter以开启websocket⽀持
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
websocketServer
websocketServer⽤来开启连接,关闭连接以及接收消息等
@Slf4j
@ServerEndpoint("/socketserver/{taskId}")
@Component
public class WebSocketServer {
/**concurrent包的线程安全Set,⽤来存放每个客户端对应的MyWebSocket对象。*/
private static ConcurrentHashMap<String,WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
/**与某个客户端的连接会话,需要通过它来给客户端发送数据*/
private Session session;
/**接收taskId*/
private String taskId="";
/**
* 连接建⽴成功调⽤的⽅法*/
@OnOpen
public void onOpen(Session session,@PathParam("taskId") String taskId) {
this.session = session;
this.taskId=taskId;
ainsKey(taskId)){
webSocketMap.put(taskId,this);
}else{
webSocketMap.put(taskId,this);
}
try {
sendMessage("socket连接成功");
} catch (IOException e) {
<("socket>>"+taskId+",⽹络异常");
}
}
/**
* 连接关闭调⽤的⽅法
*/
@OnClose
public void onClose() {
ainsKey(taskId)){
}
}
/**
* 收到客户端消息后调⽤的⽅法
* TODO 客户端交互使⽤,暂⽆⽤到
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("socket>>>:"+taskId+",报⽂:"+message);
}
/**
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
<("⽤户错误:"+this.taskId+",原因:"+Message());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(String message) throws IOException {
//加锁,否则会出现java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method异常,并发使⽤session发送消息导致的 synchronized (this.session){
BasicRemote().sendText(message);
}
}
public ConcurrentHashMap<String,WebSocketServer> getWebSocketMap(){ return webSocketMap; }
}
前台页⾯
前台页⾯使⽤js来调⽤websocket,请求websocketserver打开socket连接,并且开始和后台交互发送消息
<!DOCTYPE html >
<html xmlns:th="" >
<head>
<meta charset="utf-8">
<title>任务⽇志展⽰</title>
</head>
<body>
<script th:src="@{/js/jquery.min.js}"></script>
<input type="hidden" id="gather_task_id" th:value="${taskId}" />
<script>
var socket;
function openSocket() {
var detailDiv = $("#log_detail");
var taskId = $("#gather_task_id").val();
//实现化WebSocket对象,指定要连接的服务器地址与端⼝建⽴连接
var socketUrl="localhost:8888/socketserver/"+taskId;
place("https","ws").replace("http","ws");
if(socket!=null){
socket.close();
socket=null;
}
socket = new WebSocket(socketUrl);
//打开事件
console.log("websocket已打开");
};
//获得消息事件
console.log(msg.data);
//发现消息进⼊开始处理前端触发逻辑
detailDiv.append("<p>"+msg.data+"</p>")
};
//关闭事件
console.log("websocket已关闭");
};
//发⽣了错误事件
console.log("websocket发⽣了错误");
}
}
function sendMessage() {
if(typeof(WebSocket) == "undefined") {
console.log("您的浏览器不⽀持WebSocket");
}else {
console.log("您的浏览器⽀持WebSocket");
console.log('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
socket.send('{"toUserId":"'+$("#toUserId").val()+'","contentText":"'+$("#contentText").val()+'"}');
}
}
function printLog(){
if(typeof(WebSocket) == "undefined") {
console.log("您的浏览器不⽀持WebSocket");
alert("您的浏览器不⽀持WebSocket");
}else {
openSocket();
}
}
function quit(){
if(socket!=null){
socket.close();
socket=null;
var detailDiv = $("#log_detail");
detailDiv.append("<p>客户端已退出</p>")
}
}
</script>
<a href="javascript:void(0);" onclick="printLog()" >打印⽇志</a>
<a href="javascript:void(0);" onclick="quit()">退出</a>
<div id="log_detail">
</div>
</body>
</html>
消费mq中的⽇志消息
service中产⽣的⽇志是添加到mq队列中的,因此需要⼀个消费者消费队列中的数据,并且使⽤websocketserver将消息发送到对应的页⾯上,从⽽在页⾯上进⾏展⽰@Component
@Slf4j
public class LogConsumer {
@Resource
private WebSocketService webSocketService;
@RabbitHandler
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "${platform.parameter.queueName}",durable = "true"),
exchange = @Exchange(name = "${hcangeName}",ignoreDeclarationExceptions="true",durable = "true"),
key = "${platform.parameter.bindingKey}"
),
concurrency = "2"
)
public void listennerPush(String msg, Channel channel, Message message) throws IOException { try {
log.debug("consumer>>>接收到的消息>>>{}",msg);
//[1] - 13:15:17.484 - TwitterSpiderMobileService实现类⽅法<<<<;任务id:1
msg.split(" - ")[0].trim().replace("[","").replace("]","");
String tracId = msg.substring(0,msg.indexOf(" - ")).trim().replace("[","").replace("]","");
msg = msg.substring(msg.indexOf(" - ")+2);
//调⽤websocket发送⽇志信息到页⾯上
webSocketService.sendMessage(tracId,msg);
} catch (Exception e) {
<("获取消息失败,异常原因:{}",e.getMessage(),e);
} finally {
channel.MessageProperties().getDeliveryTag(), false);
}
}
}
sendMessage⽅法如下所⽰:
@Override
public void sendMessage(String taskId, String logMessage) {
try {
ConcurrentHashMap<String, WebSocketServer> map = WebSocketMap();
WebSocketServer server = (taskId);
if(server!=null){
server.sendMessage(logMessage);
}else{
log.warn("客户端已退出");
}
} catch (IOException e) {
<("向客户端发送消息时出现异常,异常原因:{}",e.getMessage(),e);
}
}
最终效果图
经过以上步骤即可将service中⽣成的⽇志接近实时的显⽰在前台页⾯上,最后的显⽰效果如下所⽰:
参考资料
1.
本⽂所对应的代码已上传,有需要的可以⾃⾏下载。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论