netty的使⽤场景,线程模型以及如何在springboot中使⽤netty?
⽂章⽬录
1. 为什么使⽤netty?
Netty是基于NIO来实现的(),由于NIO的类库和 API 繁杂, 使⽤⾮常⿇烦,需要熟练掌握Selector、 ServerSocketChannel、SocketChannel、ByteBuffer等。且开发⼯作量和难度都⾮常⼤: 例如客户端⾯临断线重连、 ⽹络闪断、⼼跳处理、半包读写、 ⽹络拥塞和异常流的处理等等。
Netty 对 JDK ⾃带的 NIO 的 API 进⾏了良好的封装,解决了上述问题。且Netty拥有⾼性能、 ⾼吞吐,低延迟,异步⾮阻塞,资源消耗⼩,最⼩化不必要的内存复制等优点。所以现在基本都是⽤netty做通信,由于netty官⽹已废弃5.x版本,所以现在基本都使⽤4.x版本的netty,Netty 4.x 需要JDK 6以上版本⽀持。
netty异步主要体现在:
1. 异步事件处理。把channel注册到selector上,诸如此类事件Event被放⼊事件Queue即可返回,后续再从Queue⾥消费处理;
2. 异步IO,包括Bind、Write等操作会返回⼀个ChannelFuture,进⽽异步拿到结果,不会造成线程阻塞。
3. 其他的回调也属于异步操作。异步线程主要取⾃boosGroup 和 workerGroup 线程组中的线程
Netty的使⽤场景:
1. 互联⽹⾏业:在分布式系统中,各个节点之间需要远程服务调⽤,⾼性能的 RPC 框架必不可少,Netty 作为异步⾼性能的通信框架,
往往作为基础通信组件被这些 RPC 框架使⽤。典型的应⽤有:阿⾥分布式服务框架 Dubbo 的 RPC 框架使⽤ Dubbo 协议进⾏节点间通信,Dubbo 协议默认使⽤ Netty作为基础通信组件,⽤于实现各进程节点之间的内部通信。Rocketmq底层也是⽤的Netty作为基础通信组件。
2. 游戏⾏业:⽆论是⼿游服务端还是⼤型的⽹络游戏,Java 语⾔得到了越来越⼴泛的应⽤。Netty 作为⾼性能的基础通信组件,它本⾝
提供了 TCP/UDP 和 HTTP 协议栈。
3. ⼤数据领域:经典的 Hadoop 的⾼性能通信和序列化组件 Avro 的 RPC 框架,默认采⽤ Netty 进⾏跨界点通信,它的 Netty
Service 基于 Netty 框架⼆次封装实现。
Netty框架的⽬标就是让你的业务逻辑从⽹络基础应⽤编码中分离出来,让你可以专注业务的开发,⽽不需写⼀⼤堆类似NIO的⽹络处理操作。
2. netty的线程模型
Netty的线程模型如下图所⽰:
由上图可以看出,netty整体的线程模型是可以抽象出两种线程组:
为什么使用bootstrap?BossGroup主线程组:
BossGroup内部是⼀个事件循环线程组NioEventLoopGroup ,这个组中含有多个事件循环线程NioEventLoop。这个事件循环线程NioEventLoop其实就是⼀个NIO程序,Netty对NIO的⾼度封装就体现在这⾥。由上图可以出,BossGroup中的事件循环线程NioEventLoop的内部循环有以下3步:
1. 只处理accept事件 , 与客户端建⽴连接 , ⽣成 NioSocketChannel
2. 将NioSocketChannel注册到某个WorkerGroup的NIOEventLoop上的selector上,就相当于 客户端直接和WorkerGroup中的
selector直连!
3. 处理任务队列的任务 , 即runAllTasks
WorkerGroup从线程组:
WorkerGroup内部和BossGroup⼀样,也是⼀个事件循环线程组NioEventLoopGroup 。然⽽每个事件
循环线程NioEventLoop的职责却和BossGroup中不太⼀样。他们才是真正处理客户端读写请求的线程,⼯作步骤如下:
1. 轮询注册到⾃⼰selector上的所有NioSocketChannel 的read, write事件
2. 处理 I/O 事件, 即read , write 事件, 在对应NioSocketChannel 处理业务
3. runAllTasks处理任务队列TaskQueue的任务 ,⼀些耗时的业务处理⼀般可以放⼊TaskQueue中慢慢处理,这样不影响数据在
pipeline 中的流动处理
注意: 每个WorkerGroup中的NIOEventLoop在处理客户端读写业务业务时,会使⽤ ChannelPipeline (管道),管道中维护了很多handler 处理器⽤来处理 channel 中的数据,如下图所⽰:
在Netty中,每个客户端与服务端的连接都有且仅有⼀个 ChannelPipeline 与之对应。 ⼀个Channel连接中包含了⼀个ChannelPipeline,⽽ ChannelPipeline 中⼜维护了⼀个由 ChannelHandlerContext 组成的双向链表,并且每个ChannelHandlerContext 中⼜关联着⼀个 ChannelHandler。
3. 在springboot中使⽤netty
使⽤netty实现客户端与服务端的信息交互,我们可以在启动服务端和客户端时,让他们互相发送Hello消息,实现如下:
1. 引⼊netty依赖
<dependency>
<groupId>ioty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.35.Final</version>
</dependency>
2. 服务端代码
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建两个线程组bossGroup和workerGroup, 含有的⼦线程NioEventLoop的个数默认为cpu核数的两倍
// bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
try {
//创建服务器对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* group:设置两个线程组bossGroup和workerGroup
* channel:使⽤NioServerSocketChannel作为服务端的通道实现
* option:初始化服务器连接队列⼤⼩为1024.
*        服务端处理客户端连接请求是顺序处理的,所以同⼀时间只能处理⼀个客户端连接。
*        多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理。
* childHandler:创建通道初始化对象,设置处理器
*/
serverBootstrap
.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//对workerGroup的SocketChannel设置处理器,设置客户端发来的数据的处理逻辑
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("netty 服务端已启动。。。");
/**
* 服务端对象绑定 9000 端⼝并启动
*  sync():由于bind⽅法是异步操作,使⽤sync()⽅法是等待异步操作执⾏完毕。
*/
ChannelFuture cf = serverBootstrap.bind(9000).sync();
//给cf注册,监听我们关⼼的事件
//        cf.addListener(new ChannelFutureListener() {
//            @Override
//            public void operationComplete(ChannelFuture future) throws Exception {
//                if (cf.isSuccess()) {
//                    System.out.println("监听端⼝9000成功");
//                } else {
//                    System.out.println("监听端⼝9000失败");
//                }
/
/            }
//        });
/**
* 对通道关闭进⾏监听
* 由于closeFuture是异步操作,通过sync⽅法同步等待通道关闭处理完毕,这⾥会阻塞等待通道关闭完成            */
cf.channel().closeFuture().sync();
}finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端handler
/**
* ⾃定义Handler需要继承netty规定好的某个HandlerAdapter(规范)
*
* ⽤于处理客户端发送过来的数据
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取客户端发送的数据
*
* @param ctx 上下⽂对象, 含有通道channel,管道pipeline
* @param msg 就是客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
//将 msg 转成⼀个 ByteBuf,类似NIO 的 ByteBuffer
ByteBuf buf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + String(CharsetUtil.UTF_8));
}
/
**
* 数据读取完毕处理⽅法
*
* @param ctx 上下⽂对象, 含有通道channel,管道pipeline
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = piedBuffer("HelloClient", CharsetUtil.UTF_8);
ctx.writeAndFlush(buf);
}
/
**
* 处理异常, ⼀般是需要关闭通道
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        ctx.close();
}
}
3. 客户端代码

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。