SuperSocket源码解析之消息处理
⼀简述
Tcp消息的处理本⾝是与Tcp消息传输过程独⽴的,是消息的两个不同阶段,从前⾯的会话⽣命周期我们已经知道消息的传输主要有SocketSession实现,⽽真正处理则交由AppSession实现,SuperSocket的层次划分也是⾮常清晰明了。
SuperSocket消息处理主要流程:接收=》原始过滤=》协议解析=》命令路由并执⾏=》不到命令则直接⼀分不动发给客户端
⼆消息接收
1 开始接收
代码位置:AsyncSocketSession=》StartReceive
1private void StartReceive(SocketAsyncEventArgs e)
2 {
3 StartReceive(e, 0);
4 }
5
6private void StartReceive(SocketAsyncEventArgs e, int offsetDelta)
7 {
8bool willRaiseEvent = false;
9
10try
11 {
12if (offsetDelta < 0 || offsetDelta >= Config.ReceiveBufferSize)
13throw new ArgumentException(string.Format("Illigal offsetDelta: {0}", offsetDelta), "offsetDelta");
14
15var predictOffset = SocketAsyncProxy.OrigOffset + offsetDelta;
16
17if (e.Offset != predictOffset)
18 {
19 e.SetBuffer(predictOffset, Config.ReceiveBufferSize - offsetDelta);
20 }
21
22if (IsInClosingOrClosed)
23return;
24
25 OnReceiveStarted();
26 willRaiseEvent = Client.ReceiveAsync(e);
27 }
28catch (Exception exc)
29 {
30 LogError(exc);
31
32 OnReceiveError(CloseReason.SocketError);
33return;
34 }
35
36if (!willRaiseEvent)
37 {
38 ProcessReceive(e);
39 }
40 }
View Code
在接收数据前后触发ReceiveStarted,ReceiveEnd事件
2 接收有消息处理并转⼊处理
1public void ProcessReceive(SocketAsyncEventArgs e)
2 {
3if (!ProcessCompleted(e))
4 {
5 OnReceiveError(CloseReason.ClientClosing);
6return;
7 }
8
9 OnReceiveEnded();
10
11int offsetDelta;
12
13try
14 {
15//交给app会话处理接收到的数据,⽽appsession⼜交给接收请求处理器处理
16 offsetDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
17 }
18catch (Exception exc)
19 {
20 LogError("Protocol error", exc);
21this.Close(CloseReason.ProtocolError);
22return;
23 }
24
25//read the next block of data sent from the client
26 StartReceive(e, offsetDelta);
27 }
View Code
三消息处理
1 ⼊⼝:按照协议解析,每次只处理⼀个数据包,因此便有了如下的⼊⼝代码
1int IAppSession.ProcessRequest(byte[] readBuffer, int offset, int length, bool toBeCopied)
2 {
3int rest, offsetDelta;
4
5while (true)
6 {
7var requestInfo = FilterRequest(readBuffer, offset, length, toBeCopied, out rest, out offsetDelta);
8
9if (requestInfo != null)
10 {
11try
12 {
13 AppServer.ExecuteCommand(this, requestInfo);
14 }
15catch (Exception e)
16 {
17 HandleException(e);
18 }
19 }
20
21if (rest <= 0)
22 {
23return offsetDelta;
24 }
25
26//Still have data has not been processed
27 offset = offset + length - rest;
28 length = rest;
29 }
30 }
View Code
2 原始过滤
此处以原始数据接收事件⽅式,预留给AppServer⼦类处理该原始数据包,意味着可以对原始数据包执⾏⼀次拦截处理,如果经过⼀些逻辑处理后不能满⾜,则将终⽌该数据包继续传播
3 协议解析
协议解析由AppSession的m_ReceiveFilter成员完成,该成员的实例化有2种实现⽅式
默认实例化:默认命令⾏协议,该协议举例
下⾯是2⾏命令,⾏使⽤\r\n 作为结束标识,也就是回车换⾏,命令⾏内部使⽤空格分隔,第⼀个控制之前如echo 为消息头也就是key,其后的字符串也使⽤空格分隔,作为参数,其整体=body
echo cc xxd
cdc ds mmm
解析后为2个StringRequestInfo对象
默认的协议解析⼯⼚
实例化默认协议解析对象
通过AppServer构造函数传递解析⼯⼚
4 命令路由
上⾯我们已经解析到客户端发来的2条命令分别为echo 参数为cc xxd;cdc ds mmm;
其中key分别为echo和cdc,对于命令模式来说命令本⾝使⽤name字段进⾏标识,如果我们的key与name匹配那么我们即可路由到⼀个已有的命令,进⽽执⾏该命令,来看代码
对于能够路由到的命令我们执⾏命令
对于路由失败来说SuperSocket⼜是怎么做的呢?
不到命令来处理该消息,将该命令名字发送会客户端,意思说明服务器没有实现该命令,那么命令从何⽽来?
5 命令
这还的从CommandLoader说起,CommandLoader⼜追溯到AppServer的构建过程
在没有显⽰配置CommandLoader的情况下默认为ReflectCommandLoader
ReflectCommandLoader创建命令
ReflectCommandLoader将扫描应⽤程序根⽬录下所有程序,并将实现了命令接⼝的实例通过反射创建出来
1public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
2 {
3 commands = null;
4
5var commandAssemblies = new List<Assembly>();
6
7if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
8 commandAssemblies.Add(m_AppServer.GetType().Assembly);
9
10string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
11
12if (!string.IsNullOrEmpty(commandAssembly))
13 {
14 OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
15return false;
16 }
17
18
19if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
20 {
21try
22 {
23var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray()); 24
25if (definedAssemblies.Any())
26 commandAssemblies.AddRange(definedAssemblies);
27 }
28catch (Exception e)
29 {iapp免费源码分享网站
30 OnError(new Exception("Failed to load defined command assemblies!", e));
31return false;
32 }
33 }
34
35if (!commandAssemblies.Any())
36 {
37 commandAssemblies.Add(Assembly.GetEntryAssembly());
38 }
39
40var outputCommands = new List<TCommand>();
41
42foreach (var assembly in commandAssemblies)
43 {
44try
45 {
46 outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
47 }
48catch (Exception exc)
49 {
50 OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
51return false;
52 }
53 }
54
55 commands = outputCommands;
56
57return true;
58 }
View Code
因此默认的我们只需要定义⼀些实现命令接⼝ICommand<TAppSession, TRequestInfo>的命令出来,
6 ⾃定义命令
直接继承CommandBase抽象类即可
到此SuperSocket对消息的处理流程差不多就是这样了,SuperSocket的框架的使⽤我们只需要⾃定义⾃⼰的AppServer,以及AppServer 配套的AppSession,ReciverFilter,以及命令等即可,这些在官⽅提供的例⼦中已经很清晰
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论