Netty解决粘包和拆包问题的四种⽅案
在RPC框架中,粘包和拆包问题是必须解决⼀个问题,因为RPC框架中,各个微服务相互之间都是维系了⼀个TCP长连接,⽐如dubbo就是⼀个全双⼯的长连接。由于微服务往对⽅发送信息的时候,所有的请求都是使⽤的同⼀个连接,这样就会产⽣粘包和拆包的问题。本⽂⾸先会对粘包和拆包问题进⾏描述,然后介绍其常⽤的解决⽅案,最后会对Netty提供的⼏种解决⽅案进⾏讲解。这⾥说明⼀下,由于oschina
将“jie ma qi”认定为敏感⽂字,因⽽本⽂统⼀使⽤“解码⼀器”表⽰该含义
1. 粘包和拆包
产⽣粘包和拆包问题的主要原因是,操作系统在发送TCP数据的时候,底层会有⼀个缓冲区,例如1024个字节⼤⼩,如果⼀次请求发送的数据量⽐较⼩,没达到缓冲区⼤⼩,TCP则会将多个请求合并为同⼀个请求进⾏发送,这就形成了粘包问题;如果⼀次请求发送的数据量⽐较⼤,超过了缓冲区⼤⼩,TCP就会将其拆分为多次发送,这就是拆包,也就是将⼀个⼤的包拆分为多个⼩包进⾏发送。如下图展⽰了粘包和拆包的⼀个⽰意图:
image.png
上图中演⽰了粘包和拆包的三种情况:
A和B两个包都刚好满⾜TCP缓冲区的⼤⼩,或者说其等待时间已经达到TCP等待时长,从⽽还是使⽤两个独⽴的包进⾏发送;
A和B两次请求间隔时间内较短,并且数据包较⼩,因⽽合并为同⼀个包发送给服务端;
B包⽐较⼤,因⽽将其拆分为两个包B_1和B_2进⾏发送,⽽这⾥由于拆分后的B_2⽐较⼩,其⼜与A包合并在⼀起发送。
2. 常见解决⽅案
对于粘包和拆包问题,常见的解决⽅案有四种:
客户端在发送数据包的时候,每个包都固定长度,⽐如1024个字节⼤⼩,如果客户端发送的数据长度不⾜1024个字节,则通过补充空格的⽅式补全到指定长度;
客户端在每个包的末尾使⽤固定的分隔符,例如\r\n,如果⼀个包被拆分了,则等待下⼀个包发送过来之后到其中的\r\n,然后对其拆分后的头部部分与前⼀个包的剩余部分进⾏合并,这样就得到了⼀个完整的包;
将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到⾜够长度的消息之后才算是读到了⼀个完整的消息;
通过⾃定义协议进⾏粘包和拆包的处理。
3. Netty提供的粘包拆包解决⽅案
3.1 FixedLengthFrameDecoder
对于使⽤固定长度的粘包和拆包场景,可以使⽤FixedLengthFrameDecoder,该解码⼀器会每次读取固定长度的消息,如果当前读取到的消息不⾜指定长度,那么就会等待下⼀个消息到达后进⾏补⾜。其使⽤也⽐较简单,只需要在构造函数中指定每个消息的长度即可。这⾥需要注意的是,FixedLengthFrameDecoder只是⼀个解码⼀器,Netty也只提供了⼀个解码⼀器,这是因为对于解码是需要等待下⼀个包的进⾏补全的,代码相对复杂,⽽对于编码器,⽤户可以⾃⾏编写,因为编码时只需要将不⾜指定长度的部分进⾏补全即可。下⾯的⽰例中展⽰了如何使⽤FixedLengthFrameDecoder来进⾏粘包和拆包处理:
public class EchoServer {
public void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/
/ 这⾥将FixedLengthFrameDecoder添加到pipeline中,指定长度为20
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
// 将前⼀步解码得到的数据转码为字符串
ch.pipeline().addLast(new StringDecoder());
// 这⾥FixedLengthFrameEncoder是我们⾃定义的,⽤于将长度不⾜20的消息进⾏补全空格
ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
// 最终的数据处理
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer().bind(8080);
}
}
上⾯的pipeline中,对于⼊栈数据,这⾥主要添加了FixedLengthFrameDecoder和StringDecoder,前
⾯⼀个⽤于处理固定长度的消息的粘包和拆包问题,第⼆个则是将处理之后的消息转换为字符串。最后由EchoServerHandler处理最终得到的数据,处理完成后,将处理得到的数据交由FixedLengthFrameEncoder处理,该编码器是我们⾃定义的实现,主要作⽤是将长度不⾜20的消息进⾏空格补全。下⾯是FixedLengthFrameEncoder的实现代码:
public class FixedLengthFrameEncoder extends MessageToByteEncoder<String> {
private int length;
public FixedLengthFrameEncoder(int length) {
this.length = length;
}
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
throws Exception {
/
/ 对于超过指定长度的消息,这⾥直接抛出异常
if (msg.length() > length) {
throw new UnsupportedOperationException(
"message length is too large, it's limited " + length);
}
// 如果长度不⾜,则进⾏补全
if (msg.length() < length) {
msg = addSpace(msg);
}
ctx.writeAndFlush(Unpooled.Bytes()));
}
/
/ 进⾏空格补全
private String addSpace(String msg) {
StringBuilder builder = new StringBuilder(msg);
for (int i = 0; i < length - msg.length(); i++) {
builder.append(" ");
}
String();
}
}
这⾥FixedLengthFrameEncoder实现了decode()⽅法,在该⽅法中,主要是将消息长度不⾜20的消息进⾏空格补全。EchoServerHandler的作⽤主要是打印接收到的消息,然后发送响应给客户端:
public class EchoServerHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("server receives message: " + im());
ctx.writeAndFlush("hello client!");
}
}
对于客户端,其实现⽅式基本与服务端的使⽤⽅式类似,只是在最后进⾏消息发送的时候与服务端的处理⽅式不同。如下是客户端EchoClient的代码:
public class EchoClient {
public void connect(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 对服务端发送的消息进⾏粘包和拆包处理,由于服务端发送的消息已经进⾏了空格补全,
// 并且长度为20,因⽽这⾥指定的长度也为20
ch.pipeline().addLast(new FixedLengthFrameDecoder(20));
// 将粘包和拆包处理得到的消息转换为字符串
ch.pipeline().addLast(new StringDecoder());
// 对客户端发送的消息进⾏空格补全,保证其长度为20
ch.pipeline().addLast(new FixedLengthFrameEncoder(20));
// 客户端发送消息给服务端,并且处理服务端响应的消息
ch.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture future = t(host, port).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient().connect("127.0.0.1", 8080);
}
}
对于客户端⽽⾔,其消息的处理流程其实与服务端是相似的,对于⼊站消息,需要对其进⾏粘包和拆包处理,然后将其转码为字符串,对于出站消息,则需要将长度不⾜20的消息进⾏空格补全。客户端与服务端处理的主要区别在于最后的消息处理handler不⼀样,也即这⾥的EchoClientHandler,如下是该handler的源码:
public class EchoClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("client receives message: " + im());
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("hello server!");
}
}
这⾥客户端的处理主要是重写了channelActive()和channelRead0()两个⽅法,这两个⽅法的主要作⽤在于,channelActive()会在客户端连接上服务器时执⾏,也就是说,其连上服务器之后就会往服务器发送消息。⽽channelRead0()主要是在服务器发送响应给客户端时执⾏,这⾥主要是打印服务器的响应消息。对于服务端⽽⾔,前⾯我们我们可以看到,EchoServerHandler只重写了channelRead0()⽅
法,这是因为服务器只需要等待客户端发送消息过来,然后在该⽅法中进⾏处理,处理完成后直接将响应发送给客户端。如下是分别启动服务端和客户端之后控制台打印的数据:
// server
server receives message: hello server!
// client
client receives message: hello client!
3.2 LineBasedFrameDecoder与DelimiterBasedFrameDecoder
对于通过分隔符进⾏粘包和拆包问题的处理,Netty提供了两个编解码的类,LineBasedFrameDecoder和DelimiterBasedFrameDecoder。这⾥LineBasedFrameDecoder的作⽤主要是通过换⾏符,即\n或者\r\n对数据进⾏处理;⽽DelimiterBasedFrameDecoder的作⽤则是通过⽤户指定的分隔符对数据进⾏粘包和拆包处理。同样的,这两个类都是解码⼀器类,⽽对于数据的编码,也即在每个数据包最后添加换⾏符或者指定分割符的部分需要⽤户⾃⾏进⾏处理。这⾥以DelimiterBasedFrameDecoder为例进⾏讲解,如下是EchoServer中使⽤该类的代码⽚段,其余部分与前⾯的例⼦中的完全⼀致:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
String delimiter = "_$";
// 将delimiter设置到DelimiterBasedFrameDecoder中,经过该解码⼀器进⾏处理之后,源数据将会
// 被按照_$进⾏分隔,这⾥1024指的是分隔的最⼤长度,即当读取到1024个字节的数据之后,若还是未
// 读取到分隔符,则舍弃当前数据段,因为其很有可能是由于码流紊乱造成的
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.Bytes())));
// 将分隔之后的字节数据转换为字符串数据
ch.pipeline().addLast(new StringDecoder());
// 这是我们⾃定义的⼀个编码器,主要作⽤是在返回的响应数据最后添加分隔符
ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
// 最终处理数据并且返回响应的handler
ch.pipeline().addLast(new EchoServerHandler());
}
上⾯pipeline的设置中,添加的解码⼀器主要有DelimiterBasedFrameDecoder和StringDecoder,经过这两个处理器处理之后,接收到的字节流就会被分隔,并且转换为字符串数据,最终交由EchoServerHandler处理。这⾥DelimiterBasedFrameEncoder是我们⾃定义的编码器,其主要作⽤是在返回的响应数据之后添加分隔符。如下是该编码器的源码:
public class DelimiterBasedFrameEncoder extends MessageToByteEncoder<String> {
private String delimiter;
public DelimiterBasedFrameEncoder(String delimiter) {
this.delimiter = delimiter;
}
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out)
throws Exception {
// 在响应的数据后⾯添加分隔符
ctx.writeAndFlush(Unpooled.wrappedBuffer((msg + delimiter).getBytes()));
}
}
对于客户端⽽⾔,这⾥的处理⽅式与服务端类似,其pipeline的添加⽅式如下:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
String delimiter = "_$";
// 对服务端返回的消息通过_$进⾏分隔,并且每次查的最⼤⼤⼩为1024字节
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,
Unpooled.Bytes())));
// 将分隔之后的字节数据转换为字符串
ch.pipeline().addLast(new StringDecoder());
// 对客户端发送的数据进⾏编码,这⾥主要是在客户端发送的数据最后添加分隔符
ch.pipeline().addLast(new DelimiterBasedFrameEncoder(delimiter));
// 客户端发送数据给服务端,并且处理从服务端响应的数据
ch.pipeline().addLast(new EchoClientHandler());
}
这⾥客户端的处理⽅式与服务端基本⼀致,关于这⾥没展⽰的代码,其与⽰例⼀中的代码完全⼀致,这⾥则不予展⽰。
3.3 LengthFieldBasedFrameDecoder与LengthFieldPrepender
这⾥LengthFieldBasedFrameDecoder与LengthFieldPrepender需要配合起来使⽤,其实本质上来讲,这两者⼀个是解码,⼀个是编码的关系。它们处理粘拆包的主要思想是在⽣成的数据包中添加⼀个长度字段,⽤于记录当前数据包的长度。LengthFieldBasedFrameDecoder会按照参数指定的包长度偏移量数据对接收到的数据进⾏解码,从⽽得到⽬标消息体数据;⽽LengthFieldPrepender则会在响应的数据前⾯添加指定的字节数据,这个字节数据中保存了当前消息体的整体字节数据长度。LengthFieldBasedFrameDecoder的解码过程如下图所⽰:
image.png
关于LengthFieldBasedFrameDecoder,这⾥需要对其构造函数参数进⾏介绍:
maxFrameLength:指定了每个包所能传递的最⼤数据包⼤⼩;
lengthFieldOffset:指定了长度字段在字节码中的偏移量;
lengthFieldLength:指定了长度字段所占⽤的字节长度;
lengthAdjustment:对⼀些不仅包含有消息头和消息体的数据进⾏消息头的长度的调整,这样就可以只得到消息体的数据,这⾥的lengthAdjustment指定的就是消息头的长度;
initialBytesToStrip:对于长度字段在消息头中间的情况,可以通过initialBytesToStrip忽略掉消息头以及长度字段占⽤的字节。
这⾥我们以json序列化为例对LengthFieldBasedFrameDecoder和LengthFieldPrepender的使⽤⽅式进⾏讲解。如下是EchoServer的源码:public class EchoServer {
public void bind(int port) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
.
channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 这⾥将LengthFieldBasedFrameDecoder添加到pipeline的⾸位,因为其需要对接收到的数据
// 进⾏长度字段解码,这⾥也会对数据进⾏粘包和拆包处理
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
// LengthFieldPrepender是⼀个编码器,主要是在响应字节数据前⾯添加字节长度字段
ch.pipeline().addLast(new LengthFieldPrepender(2));
/
/ 对经过粘包和拆包处理之后的数据进⾏json反序列化,从⽽得到User对象
ch.pipeline().addLast(new JsonDecoder());
// 对响应数据进⾏编码,主要是将User对象序列化为json
ch.pipeline().addLast(new JsonEncoder());
// 处理客户端的请求的数据,并且进⾏响应
ch.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoServer().bind(8080);
}
}
这⾥EchoServer主要是在pipeline中添加了两个编码器和两个解码⼀器,编码器主要是负责将响应的User对象序列化为json对象,然后在其字节数组前⾯添加⼀个长度字段的字节数组;解码⼀器主要是对接收到的数据进⾏长度字段的解码,然后将其反序列化为⼀个User对象。下⾯是JsonDecoder的源码:
public class JsonDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out)
throws Exception {
byte[] bytes = new adableBytes()];
User user = JSON.parseObject(new String(bytes, CharsetUtil.UTF_8), User.class);
out.add(user);
}
}
decoderJsonDecoder⾸先从接收到的数据流中读取字节数组,然后将其反序列化为⼀个User对象。下⾯我们看看JsonEncoder的源码:
public class JsonEncoder extends MessageToByteEncoder<User> {
@Override
protected void encode(ChannelHandlerContext ctx, User user, ByteBuf buf)
throws Exception {
String json = JSONString(user);
ctx.writeAndFlush(Unpooled.Bytes()));
}
}
JsonEncoder将响应得到的User对象转换为⼀个json对象,然后写⼊响应中。对于EchoServerHandler,其主要作⽤就是接收客户端数据,并且进⾏响应,如下是其源码:
public class EchoServerHandler extends SimpleChannelInboundHandler<User> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
System.out.println("receive from client: " + user);
ctx.write(user);
}
}
对于客户端,其主要逻辑与服务端的基本类似,这⾥主要展⽰其pipeline的添加⽅式,以及最后发送请求,并且对服务器响应进⾏处理的过程:
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2));
ch.pipeline().addLast(new LengthFieldPrepender(2));
ch.pipeline().addLast(new JsonDecoder());
ch.pipeline().addLast(new JsonEncoder());
ch.pipeline().addLast(new EchoClientHandler());
}
public class EchoClientHandler extends SimpleChannelInboundHandler<User> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.write(getUser());
}
private User getUser() {
User user = new User();
user.setAge(27);
user.setName("zhangxufeng");
return user;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, User user) throws Exception {
System.out.println("receive message from server: " + user);

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。