⾃定义Decoder继承ByteToMessageDecoder实现解码的⼩案例
ByteToMessageDecoder是⼀种ChannelInboundHandler,可以称为解码器,负责将byte字节流(ByteBuf)转换成⼀种Message,Message是应⽤可以⾃⼰定义的⼀种Java对象。
例如应⽤中使⽤protobuf协议,则可以将byte转换为Protobuf对象。然后交给后⾯的Handler来处理。
使⽤⽰例, 下⾯这段代码先将收到的数据按照换⾏符分割成⼀段⼀段的,然后将byte转换成String, 再将String转换成int, 然后把int加⼀后写回。
代码:
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
bootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.group(bossGroup, workerGroup)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024))
.
addLast(new ByteToStringDecoder())
.addLast(new StringToIntegerDecoder())
.addLast(new IntegerToByteEncoder())
.addLast(new IntegerIncHandler());
}
});
ChannelFuture bind = bootstrap.bind(8092);
bind.sync();
bind.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
这⾥的ChannelPipeline的组织结构是
ByteToStringDecoder ==> 将byte转换成String的Decoder
StringToIntegerDecoder ==>String转换成Integer对象的Decoder
IntegerToByteEncoder ==>Integer转换成byte的Encoder
IntegerIncHandler ==> 将接受到的int加⼀后返回
下⾯来逐⼀分析
ByteToStringMessageDecoder继承于ByteToMessageDecoder,并实现了ByteToMessageDecoder的
decode(ChannelHandlerContext ctx, ByteBuf in, java.util.List out)⽅法。</java.lang.object>decode⽅法实现中要求将ByteBuf中的数据进⾏解码然后将解码后的对象增加到list中:
public class ByteToStringDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
byte[] data = new adableBytes()];
list.add(new String(data, StandardCharsets.UTF_8));
}
}
ByteToStringMessageDecoder继承于ByteToMessageDecoder,并实现了ByteToMessageDecoder的
decode(ChannelHandlerContext ctx, ByteBuf in, java.util.List out)⽅法。
decode⽅法实现中要求将ByteBuf中的数据进⾏解码然后将解码后的对象增加到list中
ByteToMessageDecoder
ByteToMessageDecoder继承了ChannelInboundHandlerAdapter所以是⼀个处理Inbound事件的Handler。
其内部保存⼀个Cumulator⽤于保存待解码的ByteBuf,然后不断调⽤⼦类需要实现的抽象⽅法decode去取出byte数据转换处理。
/**
* Cumulate {@link ByteBuf}s.
*/
public interface Cumulator {
/**
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
*/
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
Cumulator有两种实现,MERGE_CUMULATOR和COMPOSITE_CMUMULATOR。MERGE_CUMULATOR通过memory copy的⽅法将in中的数据复制写⼊到cumulation中。COMPOSITE_CUMULATOR采取的是类似链表的⽅式,没有进⾏memory copy, 通过⼀种CompositeByteBuf来实现,在某些场景下会更适合。默认采⽤的是
MERGE_CUMULATOR。
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| fCnt() > 1 || cumulation.isReadOnly()) {
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
// 如果cumulation是只读的、或者要超过capacity了,或者还有其他地⽅在引⽤, 则都通过创建⼀个新的byteBuf的⽅式来扩容ByteBuf
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
return buffer;
}
};
ByteToMessageDecoder中最主要的部分在channelRead处理上
收到⼀个msg后先判断是否是ByteBuf类型,是的情况创建⼀个CodecOutputList(也是⼀种list)保存转码后的对象列表
如果cumulation为null则把msg设置为cumulation,否则合并到cumulation⾥
调⽤callDecode⽅法,尝试解码
finally中如果cumulation已经读完了,就release并置为null等待gc
调⽤fireChannelRead将解码后的out传递给后⾯的Handler
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = wInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
decodercumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
/
/ We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See github/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
}
} else {
ctx.fireChannelRead(msg);
}
}
callDecode中不断执⾏抽象decode(ctx, in, out)⽅法直到in可读数据没有减少或当前handler被remove。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// 检查当前handler是否被remove了
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
// 检查当前handler是否被remove了
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) { // 这种情况是解码出了对象但是并没有移动in的readIndex
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} ...
}
fireChannelRead(ctx, msgs, numElements)的处理⽅式是对每个解码后的消息进⾏fireChannelRead,交给下⼀个Handler处理
static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
if (msgs instanceof CodecOutputList) {
fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
} else {
for (int i = 0; i < numElements; i++) {
ctx.(i));
}
}
}
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.Unsafe(i));
}
}
以上就是ByteToMessageDecoder的主要处理部分。关于Netty,⾯试中会喜欢问道“粘包/拆包”问题,指的是⼀个消息在⽹络中是⼆进制byte流的形式传过去的,接收⽅如何判断⼀个消息是否读完、哪⾥是分割点等,这些可以通过Netty中提供的⼀些Decoder来实现,例如DelimiterBasedFrameDecoder,FixedLengthFrameDecoder, LengthFieldBasedFrameDecoder。其中最常见的应该是LengthFieldBasedFrameDecoder了,因为⾃定义的协议中通常会有⼀个协议头,⾥⾯有⼀个字段描述⼀个消息的⼤⼩长度,然后接收⽅就能知道消息读到什么时候是读完⼀个Frame了。这些解码器会在后续的⽂章中介绍。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论