RocketMq通信协议格式及编解码(源码分析)
⼀、RocketMq broker服务器与客户端的⽹络通信是基于netty4.x实现的,重点分析 RocketMq设计的通信协议及对应的编解码开发。
名字解释
编码:将java对象转换成⼆进制数据,⽤于放到⽹络中进⾏传输
解码:将从⽹络中读取到的⼆进制数据转换成相应的java对象
⼆、Remoting设计的通信协议格式如下(重点理解,能根据通信协议格式来对⽹络中读取的⼆进制数据进⾏编解码):协议格式 <length> <header length> <header data> <body data>
1 2 3 4
socket通信报文格式1、4个字节的int型数据来存储
2、
3、4的总长度
2、4个字节的int型数据来存储报⽂头部的字节长度等于3的长度
3、存储报⽂头部的数据
4、存储报⽂体的数据
三、在通信过程中,服务器与客户端通过传递 RemotingCommand 对象来进⾏交互,下⾯将根据代码实现来分析通信协议的编解码开发
3.1 分析RemotingCommand 类encode()⽅法,它将按照上述定义的协议格式进⾏对象的编码操作:
public ByteBuffer encode() {
// 1> header length size
int length = 4; //表⽰⽤4个字节来存储头部长度
// 2> header data length
byte[] headerData = this.buildHeader(); //报⽂头部的数据
length += headerData.length; //加上头部报⽂的字节长度
// 3> body data length
if (this.body != null) {
length += body.length; //如果报⽂体body有数据则加上报⽂体的字节长度
}
ByteBuffer result = ByteBuffer.allocate(4 + length); //分配⼀个 (4+length)这么⼤的字节缓冲区,这个缓冲区就⽤来
存储上述协议格式的整个报⽂的数据
//下⾯代码开始往缓冲区存放数据
// length
result.putInt(length); //缓冲区的最开始的4个字节⽤来存储总的长度length
// header length
result.putInt(headerData.length); //缓冲区接下来4个字节⽤来存储报⽂头部的长度
// header data
result.put(headerData); //缓冲区接下来存储报⽂头部数据
// body data;
if (this.body != null) {
result.put(this.body); //缓冲区最后⽤来存储报⽂体的数据
}
result.flip(); //将缓冲区翻转,⽤于将ByteBuffer放到⽹络通道中进⾏传输
return result;
}
3.2 分析RemotingCommand 类decode()⽅法,它将按照上述定义的协议格式进⾏各个报⽂段的字节数据读取,然后转换成RemotingCommand对象:
public class NettyDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = Logger(RemotingHelper.RemotingLogName);
private static final int FRAME_MAX_LENGTH = //
Integer.Property("ing.frameMaxLength", "8388608"));
public NettyDecoder() {
super(FRAME_MAX_LENGTH, 0, 4, 0, 4); //0,4,0,4 每⼀个0,4,这个是表⽰存放长度的变量的字节所占的长
度,为4个,第⼆个4表⽰就是解码之后的数据包跳过的字节数为4,表⽰就将数据包的头部给去掉了。
}
//此处省略⼀万字
}
//在调⽤decode()⽅法解码之前,会调⽤上述NettyDecoder 类的decode()⽅法,在上述构造⽅法中,会先去掉报⽂的前4个字节,这4个字节是存储的后⾯报⽂的长度.
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
int length = byteBuffer.limit(); //获取字节缓冲区的整个长度,这个长度等于通信协议格式的2、
3、4段的总长度
int headerLength = Int(); //从缓冲区中读取4个字节的int类型的数据值,这个值就是报⽂头
部的长度
byte[] headerData = new byte[headerLength];
<(headerData); //接下来从缓冲区中读取headerLength个字节的数据,这个数据就
是报⽂头部的数据
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
<(bodyData); //接下来读取length-4-headerLength 个字节的数据,这个数据就是报⽂体的数据
}
//接下来将读取到的数据转换成 RemotingCommand 对象
RemotingCommand cmd = RemotingSerializable.decode(headerData, RemotingCommand.class);
cmd.body = bodyData;
return cmd;
}
四、总结
开发任何的socket 长连接的⽹络程序,涉及服务器与客户端的开发,⾸先要定义服务端与客户端的通信协议格式,第⼆根据定义的通信协议格式来进⾏
传输数据的编解码操作。上述的通信协议格式为常⽤的通信协议格式。当长连接在设定的间隔时间范围
内没有数据传输时,需要按照协议发送⼼跳包,
⼼跳包的协议格式也可以按照这种协议格式发送,也可以另外针对发送的⼼跳包来定义通信协议格式。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论