SpringBoot整合Netty⼼跳机制过程详解
前⾔
Netty 是⼀个⾼性能的 NIO ⽹络框架,本⽂基于 SpringBoot 以常见的⼼跳机制来认识 Netty。
最终能达到的效果:
客户端每隔 N 秒检测是否需要发送⼼跳。
服务端也每隔 N 秒检测是否需要发送⼼跳。
服务端可以主动 push 消息到客户端。
基于 SpringBoot 监控,可以查看实时连接以及各种应⽤信息。
IdleStateHandler
Netty 可以使⽤ IdleStateHandler 来实现连接管理,当连接空闲时间太长(没有发送、接收消息)时则会触发⼀个事件,我们便可在该事件中实现⼼跳机制。
客户端⼼跳
当客户端空闲了 N 秒没有给服务端发送消息时会⾃动发送⼀个⼼跳来维持连接。
核⼼代码代码如下:
public class EchoClientHandle extends SimpleChannelInboundHandler<ByteBuf> {
private final static Logger LOGGER = Logger(EchoClientHandle.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.WRITER_IDLE){
LOGGER.info("已经 10 秒没有发送信息!");
//向服务端发送消息
CustomProtocol heartBeat = Bean("heartBeat", CustomProtocol.class);
ctx.writeAndFlush(heartBeat).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf in) throws Exception {
//从服务端收到消息时被调⽤
LOGGER.info("客户端收到消息={}",in.toString(CharsetUtil.UTF_8)) ;
}
}
实现⾮常简单,只需要在事件回调中发送⼀个消息即可。
由于整合了 SpringBoot ,所以发送的⼼跳信息是⼀个单例的 Bean。
@Configuration
public class HeartBeatConfig {
@Value("${channel.id}")
private long id ;
@Bean(value = "heartBeat")
public CustomProtocol heartBeat(){
springboot框架的作用return new CustomProtocol(id,"ping") ;
}
}
这⾥涉及到了⾃定义协议的内容,请继续查看下⽂。
当然少不了启动引导:
@Component
public class HeartbeatClient {
private final static Logger LOGGER = Logger(HeartbeatClient.class);
private EventLoopGroup group = new NioEventLoopGroup();
@Value("${netty.server.port}")
private int nettyPort;
@Value("${netty.server.host}")
private String host;
private SocketChannel channel;
@PostConstruct
public void start() throws InterruptedException {
Bootstrap bootstrap = new Bootstrap();
.channel(NioSocketChannel.class)
.handler(new CustomerHandleInitializer())
;
ChannelFuture future = t(host, nettyPort).sync();
if (future.isSuccess()) {
LOGGER.info("启动 Netty 成功");
}
channel = (SocketChannel) future.channel();
}
}
public class CustomerHandleInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//10 秒没发送消息将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(0, 10, 0))
.addLast(new HeartbeatEncode())
.
addLast(new EchoClientHandle())
;
}
}
所以当应⽤启动每隔 10 秒会检测是否发送过消息,不然就会发送⼼跳信息。
服务端⼼跳
服务器端的⼼跳其实也是类似,也需要在 ChannelPipeline 中添加⼀个 IdleStateHandler 。
public class HeartBeatSimpleHandle extends SimpleChannelInboundHandler<CustomProtocol> {
private final static Logger LOGGER = Logger(HeartBeatSimpleHandle.class);
private static final ByteBuf HEART_BEAT = Unpooled.piedBuffer(new CustomProtocol(123456L,"pong").toString(),CharsetUtil.UTF_8));  /**
* 取消绑定
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent idleStateEvent = (IdleStateEvent) evt ;
if (idleStateEvent.state() == IdleState.READER_IDLE){
LOGGER.info("已经5秒没有收到信息!");
//向客户端发送消息
ctx.writeAndFlush(HEART_BEAT).addListener(ChannelFutureListener.CLOSE_ON_FAILURE) ;
}
}
super.userEventTriggered(ctx, evt);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, CustomProtocol customProtocol) throws Exception {
LOGGER.info("收到customProtocol={}", customProtocol);
//保存客户端与 Channel 之间的关系
NettySocketHolder.Id(),(NioSocketChannel)ctx.channel()) ;
}
}
这⾥有点需要注意:
当有多个客户端连上来时,服务端需要区分开,不然响应消息就会发⽣混乱。
所以每当有个连接上来的时候,我们都将当前的 Channel 与连上的客户端 ID 进⾏关联(因此每个连上的客户端 ID 都必须唯⼀)。这⾥采⽤了⼀个 Map 来保存这个关系,并且在断开连接时⾃动取消这个关联。
public class NettySocketHolder {
private static final Map<Long, NioSocketChannel> MAP = new ConcurrentHashMap<>(16);
public static void put(Long id, NioSocketChannel socketChannel) {
MAP.put(id, socketChannel);
}
public static NioSocketChannel get(Long id) {
(id);
}
public static Map<Long, NioSocketChannel> getMAP() {
return MAP;
}
public static void remove(NioSocketChannel nioSocketChannel) {
}
}
启动引导程序:
Component
Component
public class HeartBeatServer {
private final static Logger LOGGER = Logger(HeartBeatServer.class);
private EventLoopGroup boss = new NioEventLoopGroup();
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.server.port}")
private int nettyPort;
/
**
* 启动 Netty
*
* @return
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(boss, work)
.channel(NioServerSocketChannel.class)
.
localAddress(new InetSocketAddress(nettyPort))
//保持长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new HeartbeatInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
LOGGER.info("启动 Netty 成功");
}
}
/**
* 销毁
*/
@PreDestroy
public void destroy() {
boss.shutdownGracefully().syncUninterruptibly();
work.shutdownGracefully().syncUninterruptibly();
LOGGER.info("关闭 Netty 成功");
}
}
public class HeartbeatInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
//五秒没有收到消息将IdleStateHandler 添加到 ChannelPipeline 中
.addLast(new IdleStateHandler(5, 0, 0))
.addLast(new HeartbeatDecoder())
.addLast(new HeartBeatSimpleHandle());
}
}
也是同样将IdleStateHandler 添加到 ChannelPipeline 中,也会有⼀个定时任务,每5秒校验⼀次是否有收到消息,否则就主动发送⼀次请求。
因为测试是有两个客户端连上所以有两个⽇志。
⾃定义协议
上⽂其实都看到了:服务端与客户端采⽤的是⾃定义的 POJO 进⾏通讯的。
所以需要在客户端进⾏编码,服务端进⾏解码,也都只需要各⾃实现⼀个编解码器即可。
CustomProtocol:
public class CustomProtocol implements Serializable{
private static final long serialVersionUID = 4671171056588401542L;
private long id ;
private String content ;
//省略 getter/setter
}
客户端的编码器:
public class HeartbeatEncode extends MessageToByteEncoder<CustomProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomProtocol msg, ByteBuf out) throws Exception {
out.Id()) ;
out.Content().getBytes()) ;
}
}
也就是说消息的前⼋个字节为 header,剩余的全是 content。
服务端的解码器:
public class HeartbeatDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
long id = in.readLong() ;
byte[] bytes = new adableBytes()] ;
String content = new String(bytes) ;
CustomProtocol customProtocol = new CustomProtocol() ;
customProtocol.setId(id);
customProtocol.setContent(content) ;
out.add(customProtocol) ;
}
}
只需要按照刚才的规则进⾏解码即可。
实现原理
其实联想到 IdleStateHandler 的功能,⾃然也能想到它实现的原理:
应该会存在⼀个定时任务的线程去处理这些消息。
来看看它的源码:
⾸先是构造函数:
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
其实就是初始化了⼏个数据:
readerIdleTimeSeconds:⼀段时间内没有数据读取
writerIdleTimeSeconds:⼀段时间内没有数据发送
allIdleTimeSeconds:以上两种满⾜其中⼀个即可
因为 IdleStateHandler 也是⼀种 ChannelHandler,所以会在 channelActive 中初始化任务:  @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: github/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
也就是会按照我们给定的时间初始化出定时任务。
接着在任务真正执⾏时进⾏判断:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
如果满⾜条件则会⽣成⼀个 IdleStateEvent 事件。
SpringBoot 监控

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。