dubbo序列化机制之hessian2序列化实现原理分析
对于远程通信,往往都会涉及到数据持久化传输问题。往⼤了说,就是,从A发出的信息,怎样能被B接收到相同信息内容!⼩点说就是,编码与解码问题! ⽽在dubbo或者说是java的远程通信中,编解码则往往伴随着序列化与反序列化!
普通java对象要想实现序列化,⼀般有⼏个步骤:
1. 实现 Serializable 接⼝;
2. ⽣成⼀个序列号: serialVersionUID, (⾮必须,但建议);
3. 重写 writeObject()/readObject() ⾃定义序列化,如有必要的话;
4. 调⽤ java.io.ObjectOutputStream 的 writeObject()/readObject() 进⾏序列化与反序列化;
简单吧,但是⼤家知道,市⾯上有许许多多的序列化框架!为啥呢?因为它们需要速度更快,体积更⼩!
今天我们就来细看下dubbo的默认序列化器 Hession2 是怎么做的吧!
从Server初始化处开始, 可以看到, 我们使⽤默认的 dubbo 是基于 netty 来创建 server的.
// com.ansportty.NettyServer
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = wCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = wCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = Channels();
//browse.NETTY-365
//browse.NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("pNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
// encoder 是⼀个基于netty解码器的⼦类: NettyCodecAdapter.InternalDecoder extends SimpleChannelUpstreamHandler
pipeline.addLast("decoder", Decoder());
// decoder 是⼀个基于netty编码器的⼦类: NettyCodecAdapter.InternalEncoder extends OneToOneEncoder
pipeline.addLast("encoder", Encoder());
// handler 则处理所有的业务逻辑处理
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
server 中使⽤了管道来进⾏通信,主要有三个 ChannelHandler:
1. decoder, 负责消息解码, 依赖于netty基础设施;
2. encoder, 负责消息的编码⼯作, 依赖于netty基础设施;(本⽂的主要⽬标)
3. 业务处理的 handler, NettyHandler;
这⼏个管道的流向如netty中阐述的⼀样,会随出站和⼊站的步骤进⾏流动; 本⽂讲解出站请求,所以步骤会是 handler -> encoder -> decoder
/
/ com.hange.support.header.HeaderExchangeChannel 此处开始发送请求数据
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new LocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
// Request 中有个有意思的变量: private static final AtomicLong INVOKE_ID = new AtomicLong(0); 负责维护本地的请求序号
Request req = new Request();
java dubboreq.ProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try {
// ⾸先会调⽤ com.ansportty.NettyClient 的 send() ⽅法
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
经过 HeaderExchangeChannel 的封装后,就有了 Request 请求, 接下来就是发送往远程的过程了!⼏个要点:
1. 每个请求都会有序列号,依次递增;
2. 设置为双向通信,即 twoWay=true, 既发送也接收请求;
3. 使⽤ DefaultFuture 封装返回值,接收异步结果;
// NettyClient, com.ansport.AbstractPeer
@Override
public void send(Object message) throws RemotingException {
send(message, Parameter(Constants.SENT_KEY, false));
}
// NettyClient, com.ansport.AbstractClient
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (send_reconnect && !isConnected()) {
connect();
}
Channel channel = getChannel();
//TODO Can the value returned by getChannel() be null? need improvement.
if (channel == null || !channel.isConnected()) {
throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
}
// 调⽤另⼀个 channel 写⼊
channel.send(message, sent);
}
// com.ansportty.NettyChannel
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 写业务由此触发,返回⼀个异步 Future
ChannelFuture future = channel.write(message);
if (sent) {
// 如果是发送请求, 则然后再阻塞等待结果
// 还有另⼀个阻塞等待结果的地⽅,是在 DubboInvoker 中
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
success = future.await(timeout);
}
// 如果发现异常,则将异步异常抛出
Throwable cause = Cause();
if (cause != null) {
throw cause;
}
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit");
}
}
谈到对future结果的处理,我们还是倒回到 DubboInvoker 中,进⾏看下是怎样处理的!
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = MethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = AndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 如果单向发送的包,则直接忽略结果即可
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return new RpcResult();
} else if (isAsync) {
// 针对设置为异步的请求,直接将future设置到上下⽂后,返回空结果即可
ResponseFuture future = quest(inv, timeout);
return new RpcResult();
} else {
// 针对同步请求
// 发起远程请求, 获取到 future 异步结果, 调⽤ () 同步阻塞,等待结果后返回
return (Result) quest(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + MethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + MethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
接下就正式进⼊到 Socket 的发⽹络发送流程中了,我们来看下都是怎么做的!(注意: 现在的数据还是原始数据,并没有序列化)
// org.jbossty.channel.socket.nio.NioClientSocketChannel, org.jbossty.channel.AbstractChannel
public ChannelFuture write(Object message) {
return Channels.write(this, message);
}
// org.jbossty.channel.Channels
/**
* Sends a {@code "write"} request to the last
* {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel}.
*
* @param channel the channel to write a message
* @param message the message to write to the channel
*
* @return the {@link ChannelFuture} which will be notified when the
* write operation is done
*/
public static ChannelFuture write(Channel channel, Object message) {
return write(channel, message, null);
}
// 写⼊数据到管道尾部, ⼀切看起来都很美好,返回 future 了事!进⼊ pipeline 之后,就会调⽤⼀系列的链处理,如加解码/**
* Sends a {@code "write"} request to the last
* {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
* the specified {@link Channel}.
*
* @param channel the channel to write a message
* @param message the message to write to the channel
* @param remoteAddress the destination of the message.
* {@code null} to use the default remote address
*
* @return the {@link ChannelFuture} which will be notified when the
* write operation is done
*/
public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
ChannelFuture future = future(channel);
new DownstreamMessageEvent(channel, future, message, remoteAddress));
return future;
}
// org.jbossty.channel.DefaultChannelPipeline
public void sendDownstream(ChannelEvent e) {
DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
if (tail == null) {
try {
getSink().eventSunk(this, e);
return;
} catch (Throwable t) {
notifyHandlerException(e, t);
return;
}
}
// 添加到 tail 中
sendDownstream(tail, e);
}
void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try {
// 调⽤下⼀个 pipeline 的处理⽅法 handler 的处理 handleDownstream(), 即调⽤ NettyHandler 了
((ChannelDownstreamHandler) Handler()).handleDownstream(ctx, e);
} catch (Throwable t) {
// Unlike an upstream event, a downstream event usually has an
// incomplete future which is supposed to be updated by ChannelSink.
// However, if an exception is raised before the event reaches at
// ChannelSink, the future is not going to be updated, so we update
// here.
notifyHandlerException(e, t);
}
}
// NettyHandler, org.jbossty.channel.SimpleChannelHandler
/**
* {@inheritDoc} Down-casts the received downstream event into more
* meaningful sub-type event and calls an appropriate handler method with
* the down-casted event.
*/
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof MessageEvent) {
/
/ 消息发送⾛数据写⼊逻辑, ctx 即是上下⽂的末端 tail
writeRequested(ctx, (MessageEvent) e);
} else if (e instanceof ChannelStateEvent) {
// 事件类处理逻辑
ChannelStateEvent evt = (ChannelStateEvent) e;
switch (State()) {
case OPEN:
if (!Boolean.TRUE.Value())) {
closeRequested(ctx, evt);
}
break;
case BOUND:
if (Value() != null) {
bindRequested(ctx, evt);
} else {
unbindRequested(ctx, evt);
}
break;
case CONNECTED:
if (Value() != null) {
connectRequested(ctx, evt);
} else {
disconnectRequested(ctx, evt);
}
break;
case INTEREST_OPS:
setInterestOpsRequested(ctx, evt);
break;
default:
ctx.sendDownstream(e);
}
} else {
ctx.sendDownstream(e);
}
}
接下来调⽤ handler 的 writeRequest(), 进⾏ pipeline 管道式调⽤:
// NettyHandler, com.ansportty.NettyHandler
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// 先调⽤⽗类, SimpleChannelHandler, 处理链上的逻辑
super.writeRequested(ctx, e);
NettyChannel channel = Channel(), url, handler);
try {
handler.sent(channel, e.getMessage());
} finally {
}
}
// org.jbossty.channel.SimpleChannelHandler
/**
* Invoked when {@link Channel#write(Object)} is called.
*/
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ctx.sendDownstream(e);
}
/
/ org.jbossty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext
public void sendDownstream(ChannelEvent e) {
// 查看是否有上⼀节点,如果有,递归调⽤。即: pipeline 管道效果,依次流过事件处理
// 所谓的 pipeline 链的实现原理哦
DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
if (prev == null) {
try {
getSink().eventSunk(DefaultChannelPipeline.this, e);
} catch (Throwable t) {
notifyHandlerException(e, t);
}
} else {
DefaultChannelPipeline.this.sendDownstream(prev, e);
}
}
// DefaultChannelPipeline.this.sendDownstream() , 调⽤业务的 handler 进⾏处理, 即编码解码过程void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
if (e instanceof UpstreamMessageEvent) {
throw new IllegalArgumentException("cannot send an upstream event to downstream");
}
try {
((ChannelDownstreamHandler) Handler()).handleDownstream(ctx, e);
} catch (Throwable t) {
// Unlike an upstream event, a downstream event usually has an
// incomplete future which is supposed to be updated by ChannelSink.
// However, if an exception is raised before the event reaches at
// ChannelSink, the future is not going to be updated, so we update
// here.
notifyHandlerException(e, t);
}
}
然后是编码操作!
// 调⽤encoder, InternalEncoder 的⽗类: org.OneToOneEncoder public void handleDownstream(
ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
if (!(evt instanceof MessageEvent)) {
ctx.sendDownstream(evt);
return;
}
MessageEvent e = (MessageEvent) evt;
// 编码数据
if (!doEncode(ctx, e)) {
ctx.sendDownstream(e);
}
}
protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Object originalMessage = e.getMessage();
// 此处 encode() 由⼦类实现, 从⽽实现⾃定义的编码⽅式
Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
if (originalMessage == encodedMessage) {
return false;
}
if (encodedMessage != null) {
// 编码好后,就进⾏远端的写⼊了, tcp 协议, TruncatedChannelBuffer
// 其实在 encode() 的时候已经将数据发送了
write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
}
return true;
}
// com.ansportty.NettyCodecAdapter$InternalEncoder
@Sharable
private class InternalEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
com.ing.buffer.ChannelBuffer buffer =
com.ing.buffer.ChannelBuffers.dynamicBuffer(1024);
NettyChannel channel = OrAddChannel(ch, url, handler);
try {
/
/ 调⽤ codec 的 encode(), 即 DubboCountCodec, 当然,该 codec 是可以通过请求 url 参数⾥指定的
} finally {
}
// 使⽤ ChannelBuffers 包装 _buffer 后,返回数据给到调⽤⽅,以便进⾏写稿
return ChannelBuffers.ByteBuffer());
}
}
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
}
// DubboCountCodec, com.dec.ExchangeCodec
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
// 编码数据
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
}
}
具体的写⼊格式如下: 请求头魔数 -> 请求序列化⽅式标识 -> 请求类型标识 -> 请求序列号 -> body // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec, com.dec.ExchangeCodec protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// 写⼊格式如下: 请求头魔数 -> 请求序列化⽅式标识 -> 请求类型标识 -> 请求序列号 -> body
// 所有数据写⼊ buffer
// header.
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
header[2] = (byte) (FLAG_REQUEST | ContentTypeId());
if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
if (req.isEvent()) header[2] |= FLAG_EVENT;
// set request id.
Bytes.Id(), header, 4);
// encode request data.
int savedWriteIndex = buffer.writerIndex();
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.Url(), bos);
if (req.isEvent()) {
encodeEventData(channel, out, Data());
} else {
// 编码数据, 此处可能遇到复杂的对象, 解决办法是递归调⽤ JavaSerializer
encodeRequestData(channel, out, Data(), Version());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
checkPayload(channel, len);
Bytes.int2bytes(len, header, 12);
// write
buffer.writerIndex(savedWriteIndex);
buffer.writeBytes(header); // write header.
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec , 进⾏数据编码
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data;
out.writeUTF(version);
out.Attachment(Constants.PATH_KEY));
out.Attachment(Constants.VERSION_KEY));
out.MethodName());
out.ParameterTypes()));
Object[] args = Arguments();
if (args != null)
for (int i = 0; i < args.length; i++) {
// 写⼊单个字段数据
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
out.Attachments());
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论