SpringBoot--Netty的配置使⽤
Netty封装了JDK的NIO,让你⽤得更爽,你不⽤再写⼀⼤堆复杂的代码了。
Netty是⼀个异步事件驱动的⽹络应⽤框架,⽤于快速开发可维护的⾼性能服务器和客户端。
有了Netty,你可以实现⾃⼰的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。
在讲Netty之前我们先引⼊⼀个概念:NIO
NIO
说NIO之前先说⼀下BIO(Blocking IO),如何理解这个Blocking呢?
1. 客户端监听(Listen)时,Accept是阻塞的,只有新连接来了,Accept才会返回,主线程才能继
2. 读写socket时,Read是阻塞的,只有请求消息来了,Read才能返回,⼦线程才能继续处理
3. 读写socket时,Write是阻塞的,只有客户端把消息收了,Write才能返回,⼦线程才能继续读取下⼀个请求
传统的BIO模式下,从头到尾的所有线程都是阻塞的,这些线程就⼲等着,占⽤系统的资源,什么事也不⼲。
那么NIO是怎么做到⾮阻塞的呢。它⽤的是事件机制。它可以⽤⼀个线程把Accept,读写操作,请求处理的逻辑全⼲了。如果什么事都没得做,它也不会死循环,它会将线程休眠起来,直到下⼀个事件来了再继续⼲活,这样的⼀个线程称之为NIO线程
Netty 内部流程
Netty在Spring Boot的简单实例
现在就使⽤⼀个简单的场景进⾏举例:服务端监控客户端是否在线。
服务端
引⼊netty的包依赖:
<dependency>
<groupId>ioty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
开启⼀个服务端:NettyServer.java
import ioty.bootstrap.ServerBootstrap;
import ioty.channel.Channel;
import ioty.channel.ChannelOption;
import ioty.channel.EventLoopGroup;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @author JiaweiWu
* @author JiaweiWu
*/
@Component
public class NettyServer {
private static final Logger LOGGER = Logger(NettyServer.class);
@Value("${netty.server.port}")
public Integer port;
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
private void startServer(){
//服务端需要2个线程组  boss处理客户端连接  work进⾏客服端连接之后的处理
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
//服务器配置
//            up(boss,work).channel(NioServerSocketChannel.class)
//                    .childHandler(new ChannelInitializer<SocketChannel>() {
//                        protected void initChannel(SocketChannel socketChannel) throws Exception {
//                            // HttpServerCodec:将请求和应答消息解码为HTTP消息
//                            socketChannel.pipeline().addLast("http-codec",new HttpServerCodec());
//                            // HttpObjectAggregator:将HTTP消息的多个部分合成⼀条完整的HTTP消息
/
/                            socketChannel.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));
//                            // ChunkedWriteHandler:向客户端发送HTML5⽂件
//                            socketChannel.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
//                            // 进⾏设置⼼跳检测
//                            socketChannel.pipeline().addLast(new IdleStateHandler(60,30,60*30, TimeUnit.SECONDS));
//                            // 配置通道处理来进⾏业务处理
//                            socketChannel.pipeline().addLast(new MyChannelHandler());
//                        }
//                    }).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true);
//服务器配置
.childHandler(new MyChatServerInializer()).option(ChannelOption.SO_BACKLOG,1024).childOption(ChannelOption.SO_KEEPALIVE,true);            //绑定端⼝开启事件驱动
LOGGER.info("【服务器启动成功========端⼝:"+port+"】");
Channel channel = bootstrap.bind(port).sync().channel();
channel.closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
前端websocket怎么用//关闭资源
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
@PostConstruct()
public void init(){
//需要开启⼀个新的线程来执⾏netty server 服务器
new Thread(new Runnable() {
public void run() {
startServer();
}
}).start();
}).start();
}
}
上⾯使⽤到两个配置:⼀个在配置⽂件application.properties中的配置,⼀个服务端的初始化配置MyChatServerInializer.java application.properties
# netty 配置
# 端⼝
netty.server.port=9001
MyChatServerInializer.java
import com.jmk.frame.device.serverty.handler.MyChannelHandler;
import ioty.channel.ChannelInitializer;
import ioty.channel.ChannelPipeline;
import ioty.channel.socket.SocketChannel;
import dec.http.HttpObjectAggregator;
import dec.http.HttpServerCodec;
import ioty.handler.stream.ChunkedWriteHandler;
import ioty.handler.timeout.IdleStateHandler;
import urrent.TimeUnit;
/**
* @author JiaweiWu
*/
public class MyChatServerInializer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//分割接收到的Bytebu,根据指定的分割符为换⾏
//        pipeline.addLast(new DelimiterBasedFrameDecoder(10240, Delimiters.lineDelimiter()));
//        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
//        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// HttpServerCodec:将请求和应答消息解码为HTTP消息
pipeline.addLast("http-codec",new HttpServerCodec());
// HttpObjectAggregator:将HTTP消息的多个部分合成⼀条完整的HTTP消息
pipeline.addLast("aggregator",new HttpObjectAggregator(65536));
// ChunkedWriteHandler:向客户端发送HTML5⽂件
pipeline.addLast("http-chunked",new ChunkedWriteHandler());
// 进⾏设置⼼跳检测
pipeline.addLast(new IdleStateHandler(80,70,60*30, TimeUnit.SECONDS));
// 配置通道处理来进⾏业务处理
pipeline.addLast(new MyChannelHandler());
}
}
创建MyChannelHandler.java,对消息信号进⾏监测
import cn.hutool.json.JSONObject;
import com.stant.ConstantInterface;
import com.jmk.frame.devicemonty.utils.GlobalUserUtil;
import com.jmk.frame.device.server.biz.DeviceBiz;
import com.jmk.frame.ity.DeviceChannel;
import com.jmk.frame.device.serverty.manager.DeviceChannelManager;
import ioty.buffer.ByteBuf;
import ioty.buffer.Unpooled;
import ioty.channel.*;
import dec.http.*;
import dec.http.*;
import dec.http.websocketx.*;
import ioty.handler.timeout.IdleStateEvent;
import ioty.util.AttributeKey;
import ioty.util.CharsetUtil;
import org.apachemons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
/**
* @author JiaweiWu
*/
@Component
public class MyChannelHandler extends SimpleChannelInboundHandler<Object>{
private static final Logger LOGGER = Logger(MyChannelHandler.class);
private static final String URI = "websocket";
private WebSocketServerHandshaker handshaker ;
private static MyChannelHandler myChannelHandler;
@Autowired
DeviceBiz deviceBiz;
@PostConstruct
public void init(){
myChannelHandler = this;
}
/**
* 连接上服务器
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("【handlerAdded】====>"+ctx.channel().id());
GlobalUserUtil.channels.add(ctx.channel());
}
/
**
* 断开连接
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("【handlerRemoved】====>"+ctx.channel().id());
ve(ctx);
//修改设备的在线状态
DeviceChannel deviceChannel =  ByChannelId(ctx.channel().id().asLongText());
if (deviceChannel != null) {
List<DeviceChannel> deviceChannels = Imei());
LOGGER.info("flow==deviceChannelList:"+ new JSONObject(deviceChannels)+"-----size:"+deviceChannels.size());
if (deviceChannels.size() < 2) {
if (StringUtils.Imei())) {
myChannelHandler.deviceBiz.Imei(), ConstantInterface.ONLINE_STATUS.NOT_ONLINE);
}
}
}
}
}
/**
* 连接异常需要关闭相关资源
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ("【系统异常】======>"+String());
ctx.close();
ctx.channel().close();
}
/**
* 活跃的通道也可以当作⽤户连接上客户端进⾏使⽤
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("【channelActive】=====>"+ctx.channel());
}
/
**
* 不活跃的通道就说明⽤户失去连接
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOGGER.info("【channelInactive】=====>"+ctx.channel());
}
/**
* 这⾥只要完成 flush
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 这⾥是保持服务器与客户端长连接进⾏⼼跳检测避免连接断开
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent stateEvent = (IdleStateEvent) evt;
PingWebSocketFrame ping = new PingWebSocketFrame();
switch (stateEvent.state()){
//读空闲(服务器端)
case READER_IDLE:
LOGGER.info("【"+ctx.channel().remoteAddress()+"】读空闲(服务器端)");
ctx.writeAndFlush(ping);

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。