Netty学习第四章springboot整合netty的使⽤
现在⼤多数项⽬都是基于spring boot进⾏开发,所以我们以spring boot作为开发框架来使⽤netty。使⽤spring boot的⼀个好处就是能给将netty的业务拆分出来,并通过spring cloud整合到项⽬中。
我们以⼀个简单的客户端发送消息到服务的场景编写⼀个实例。
⼀、服务端模块
netty中服务端⼀般分为两个类,⼀个是启动配置类,另⼀个是消息的逻辑处理类,但是⾸先我们要配置spring boot的启动类,启动netty
@SpringBootApplication
public class DemoApplication implements CommandLineRunner {
@Autowired
NettyServer nettyServer;
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Override
public void args) throws Exception {
nettyServer.startServer();
}
}
1.启动配置类
import ioty.bootstrap.ServerBootstrap;
import ioty.channel.*;
import ioty.channel.nio.NioEventLoopGroup;
import ioty.channel.socket.SocketChannel;
import ioty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.beans.factory.annotation.Autowired;
import t.annotation.Configuration;
/**
* Netty
* 服务端
*/
@Configuration
public class NettyServer {
//四个处理请求的逻辑类
@Autowired
ServerInboundHandler serverInboundHandler;
@Autowired
ServerInboundGetTimeHandler serverInboundGetTimeHandler;
@Autowired
ServerLastOutboundHandler serverLastOutboundHandler;
@Autowired
ServerOutboundHandler serverOutboundHandler;
public void startServer() {
System.out.println("服务端启动成功");
//创建两个线程组,⽤于接收客户端的请求任务,创建两个线程组是因为netty采⽤的是反应器设计模式
//反应器设计模式中bossGroup线程组⽤于接收
EventLoopGroup bossGroup = new NioEventLoopGroup();
//workerGroup线程组⽤于处理任务
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建netty的启动类
ServerBootstrap bootstrap = new ServerBootstrap();
//创建⼀个通道
ChannelFuture f = null;
try {
.channel(NioServerSocketChannel.class) //设置通道为⾮阻塞IO
.
option(ChannelOption.SO_BACKLOG, 128) //设置⽇志
.option(ChannelOption.SO_RCVBUF, 32 * 1024) //接收缓存
.childOption(ChannelOption.SO_KEEPALIVE, true)//是否保持连接
.childHandler(new ChannelInitializer<SocketChannel>() {
//设置处理请求的逻辑处理类
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//ChannelPipeline是handler的任务组,⾥⾯有多个handler
ChannelPipeline pipeline = ch.pipeline();
//逻辑处理类
pipeline.addLast(serverLastOutboundHandler);
pipeline.addLast(serverOutboundHandler);
pipeline.addLast(serverInboundHandler);
pipeline.addLast(serverInboundGetTimeHandler);
}
});
f = bootstrap.bind(84).sync();//阻塞端⼝号,以及同步策略
f.channel().closeFuture().sync();//关闭通道
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
//优雅退出
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
2.启动配置类中的各个组件
1)EventLoop 与 EventLoopGroup
EventLoop 好⽐⼀个线程,1个EventLoop 可以服务多个channel,⽽⼀个channel只会有⼀个EventLoop 。EventLoop 在netty中就是负责整个IO操作,包括从消息的读取、编码以及后续 ChannelHandler 的执⾏,这样做的好处就是避免了线程中的上下⽂切换时,⼤量浪费资源情况。
EventLoopGroup 是负责分配EventLoop到新创建的channel,EventLoopGroup 就好⽐线程池,它⾥⾯包含多个EventLoop。
2)BootStrap
BootStrap 是netty中的引导启动类也就是⼀个⼯⼚配置类,可以通过它来完成 Netty 的客户端或服务器端的 Netty 初始化,所以我们主要来看它的⼏个常⽤的配置⽅法。
① gruop() ⽅法
gruop()⽅法⽤于配置netty中的线程组,也就是我们的EventLoopGroup ,在服务端中需要配置两个线程组,这是因为netty 中采⽤的是反应器设计模式(reactor ),我们知道反应器设计模式中是需要两个线程组,⼀个⽤于接收⽤户的请求,另⼀个⽤于处理请求的内容。
② channel() ⽅法
channel()⽅法⽤于配置通道的IO类型,IO类型有两个:阻塞IO(BIO)OioServerSocketChannel;⾮阻塞
IO(NIO)NioServerSocketChannel。
③ childHandler () ⽅法
⽤于设置处理请求的适配器,这个在下⾯详细介绍。
④ childOption() ⽅法
给每条child channel连接设置⼀些TCP底层相关的属性,⽐如上⾯,我们设置了两种TCP属性,其中ChannelOption.SO_KEEPALIVE表⽰是否开启TCP底层⼼跳机制,true为开
⑤ option
给每条parent channel 连接设置⼀些TCP底层相关的属性。
关于option的属性有:
SO_RCVBUF ,SO_SNDBUF:⽤于设置TCP连接中使⽤的两个缓存区。
TCP_NODELAY:⽴即发送数据,采⽤的是Nagle算法。Nagle算法是当⼩数据过多时,就会将这些⼩数据碎⽚连接成更⼤的报⽂,从⽽保证发送的报⽂数量最⼩。所以如果数据量⼩就要禁⽤这个算法,netty默认是禁⽤的值为true。
通俗地说,如果要求⾼实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少⽹络交互,就开启。
SO_KEEPALIVE:底层TCP协议的⼼跳机制。Socket参数,连接保活,默认值为False。启⽤该功能时,TCP会主动探测空闲连接的有效性。
SO_REUSEADDR:Socket参数,地址复⽤,默认值False
SO_LINGER:Socket参数,关闭Socket的延迟时间,默认值为-1,表⽰禁⽤该功能。
SO_BACKLOG:Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。
SO_BROADCAST:Socket参数,设置⼴播模式。
3)ChannelFuture
我们知道netty中的所有IO操作都是异步的,这意味着任何IO调⽤都会⽴即返回,不管结果如果状态如果。⽽ChannelFuture 的存在就是为了解决这⼀问题,它会提供IO操作中有关的信息、结果或状态。
ChannelFuture ⼀共有两个状态:
未完成状态:当IO操作开始时,将创建⼀个新的ChannelFuture 对象,此时这个对象既没有操作成功也没有失败,那么就说这个对象就是未完成的状态。简单来说未完成指创建了对象且没有完成IO操作。
springcloud难学吗 已完成状态:当IO操作完成后,不管操作是成功还是失败,future都是标记已完成的,失败时也会有对应的具体失败信息。
3.消息逻辑处理类
可以看到我⼀共在pipeline⾥⾯配置了4个handler,这是为了查看inboundhandler和outboundhandler的数据传递⽅式,以及每个handler的执⾏顺序
ServerInboundGetTimeHandler:
import ioty.buffer.ByteBuf;
import ioty.buffer.Unpooled;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import t.annotation.Configuration;
SimpleDateFormat;
import java.util.Date;
/**
* Inbound处理类
* 给客户端返回⼀个时间戳
*/
@Configuration
public class ServerInboundGetTimeHandler extends ChannelInboundHandlerAdapter { /**
* 获取客户端的内容类
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //将传递过来的内容转换为ByteBuf对象
ByteBuf buf = (ByteBuf) msg;
//和⽂件IO⼀样,⽤⼀个字节数组读数据
byte[] reg = new adableBytes()];
//将读取的数据转换为字符串
String body = new String(reg, "UTF-8");
//给客户端传递的内容,同样也要转换成ByteBuf对象
Date dNow = new Date( );
SimpleDateFormat ft = new SimpleDateFormat ("yyyy-MM-dd hh:mm:ss");
String respMsg = body+ft.format(dNow);
System.out.println("服务器当前时间是:"+ft.format(dNow));
ByteBuf respByteBuf = Bytes());
//调⽤write⽅法,通知并将数据传给outboundHand
ctx.write(respByteBuf);
}
/**
* 刷新后才将数据发出到SocketChannel
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush();
}
/**
* 关闭
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
ServerInboundHandler:
import ioty.buffer.ByteBuf;
import ioty.buffer.Unpooled;
import ioty.channel.ChannelHandler;
import ioty.channel.ChannelHandlerContext;
import ioty.channel.ChannelInboundHandlerAdapter;
import t.annotation.Configuration;
/**
* Inbound处理类,是⽤来处理客户端发送过来的信息
* Sharable 所有通道都能使⽤的handler
*/
@Configuration
@ChannelHandler.Sharable
public class ServerInboundHandler extends ChannelInboundHandlerAdapter {
/**
* 获取客户端的内容类
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //将传递过来的内容转换为ByteBuf对象
ByteBuf buf = (ByteBuf) msg;
//和⽂件IO⼀样,⽤⼀个字节数组读数据
byte[] reg = new adableBytes()];
//将读取的数据转换为字符串
String body = new String(reg, "UTF-8");
System.out.println( "服务端接收的信息是: " + body);
//给客户端传递的内容,同样也要转换成ByteBuf对象
String respMsg = "你好我是服务端,当前时间是:";
ByteBuf respByteBuf = Bytes());
//调⽤fireChannelRead⽅法,通知并将数据传给下⼀个handler
ctx.fireChannelRead(respByteBuf);
}
/**
* 刷新后才将数据发出到SocketChannel
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush();
}
/**
* 关闭
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论