QuickFixJ源代码研究(⼀)
QuickFix/J 源代码研究(⼀)
liyayawodeai@163
(〇)QuickFix/J简介
FIX是Financial Information eXchange的简称。FIX是⼀种专门为实时电⼦证券交易设计的标准消息协议。FIX协议由FIX protocol, Ltd(FPL)所有并维护。FIX协议的⽹址为
QuickFix/J是实现了FIX协议所有版本及其功能的开源软件,100%使⽤JAVA实现。
QuickFix/J的⽹址为
QuickFix/J的源代码可以从下载,也可以去QuickFix/J的官⽅⽹站,进⼊下载页⾯下载源代码。
那么能⽤QuickFix/J做什么事情呢?关注股票的兄弟们⼀定留意过LevelII这个名词,他是中国股票交易新⾏情的简称。简单说,可以将QuickFix/J的代码改造⼀下,就⽤来接受深圳证券交易的LevelII⾏情数据。当然接受⾏情不是免费的,需要诸多的商务⼿续,但是本⽂仅仅讨论QuickFix/J的开源代码的设计
和实现,并且侧重于QuickFix/J的客户端实现。服务器端留在以后的⽂章介绍。关于上海证券交易所的LevelII数据的格式和接受,跟深圳的有诸多的不同,也留在以后讨论。
⾸先QuickFixJ代码功能主要有两⼤部分,⼀部分是Fix协议数据的解析,另外⼀部分是客户端跟服务器端建⽴连接并维持回话,传输数据。第⼀部分将主要介绍QuickFix/J的传输部分的实现。
(⼀) QuickFix/J传输功能部分
QuickFix/J的连接管理和传输功能是基于MINA框架实现的。MINA是什么?MINA是Apache旗下的⼀个⽹络应⽤框架,能够帮助⼤家轻松的开发⾼性能、⾼扩展性的⽹络程序。它使⽤NIO在传输协议(⽐如TCP/IP,UDP/IP)之上提供了抽象的、事件驱动的、异步处理的API。MINA的⽹址为。
A). QuickFix/J客户端⽤到的主要类的功能说明(V1.5.0)
1. quickfix.Initiator:定义了⼀些从配置中获取通信协议、主机、端⼝及重连接的时间间隔的KEY,仅仅是key⽽已,没有其他的。
2. quickfix.mina.SessionConnector:定义了⼀些helper⽅法,为initiator和acceptor提供公⽤的功能,⽐如获取Session,创建Session,动态添加/删除Session,判断是否已经登陆。在SessionConnector中定义了SessionTimerTask,这个Timer的主要⽬的是例⾏检查和更新Session的状态,发现有问题及
时操作。SessionTimerTask发现登陆状态错误之后能⾃动重连登陆、更新Session时间戳、发送⼼跳消息。检查的具体逻辑在()中,细节请参考5。
3. quickfix.mina.initiator.AbstractSocketInitiator:是SocketInitiator的基础抽象基类,继承了SessionConnector和Initiator。在QuickFix/J中提供了两种默认的具体实现,分别是SocketInitiator和ThreadedSocketInitiator。这两种具体实现的功能都⼀样,两者的区别仅仅是处理消息时使⽤线程的策略不同,具体请参考7和8。抽象类AbstractSocketInitiator提供的功能有:
a) 遍历配置⽂件取得所有[session]节的配置并创建相应的FixSession(如果[session]中没有指定ConnectionType或者明确指定了ConnectionType为initiator,则建⽴
session和application的区别FixSession(quickfix.Session),其他类型的ConnectionType⽆效,如acceptor)。配置⽂件中可以指定多个[session]。
b) 通过已经⽣成的FixSession和传⼊的eventHandlingStrategy创建IoSessionInitiator,并保存⼊initiators(Set类型的缓存)中。
那么FixSession和IoSessionInitiator有什么区别呢?请参考5、6。
c)启动、关闭客户端(initiator)。启动initiator时⾸先启动应⽤层的SessionTimer(请参考2),然后
启动连接层的initiator(IoSessionInitiator)。关闭initiator时,先关闭连接层的initiator(IoSessionInitiator),再关闭应⽤层的SessionTimer。
4. quickfix.SessionID:是Session的唯⼀标识。SessionID中包含beginString(必须),senderCompID(必须),senderSubID(可选),senderLocationID(可
选),targetCompID(必须),targetSubID(可选),targetSubID(可选),targetLocationID(可选),sessionQualifier(可选)。sessionQualifer⽤于区分具有相同的targetCompID不同的session,只能⽤在initiator⾓⾊中。String⽣成的可读的SessionID字符串组成为:beginString:senderCompID/senderSubID/senderLocationID-
>targetCompID/targetSubID/targetLocationID/sessionQualifier。如果可选值未设置则在Session ID字符串中默认空字符串。
5. quickfix.Session:Session是FIX消息通讯中最基本的抽象。
a) fixSession维护Session内部消息的⾃增序列号、⾃动错误恢复、与通信对⽅(counterpart)建⽴通信信道(communication channel)。
b) Session是独⽴于特定的传输层协议的。Session被新建时,消息序列号置为1,每次通信序列号⾃
增,直到Session被重置(reset)。每个Session能够跨越多个传输连接(并⾮同时跨越,⽽是说第⼀次⽹络连接断开后,随后重连,虽然底层的⽹络连接已经是新建的了,但是Session还能保持跟断⽹之前是同⼀个Session)。
c) fixSession中核⼼逻辑在next()⽅法中。
():主要检查了以下8个状态,并且根据不同的状态做了响应的操作。next()在SessionTimerTask中被定时执⾏,作为例⾏检查Session状态的任务。
1) 检查Session是否是enabled。enabled状态表⽰什么?enable是Session中记录当前是否处于登陆状态。当执⾏了logon时将enabled置为true,当logout时将enabled置为false。
如果不是enabled状态并且不是loggedOn状态,则说明正常退出登陆。结束此次检查并返回。
如果不是enabled状态,但是是loggedOn状态说明Session上次⾮正常logout或者logout消息已经发出,正在返回的途中,那么就检查是否isLogoutSent,如果还没有发送logout,那么现在就发送logout消息。
2)检查完Session的enabled状态之后,检查checkSessionTime。没有必要接收到每条消息都去检查Session的时间戳,顶多1秒检查⼀次就可以了,否则容易影响性能。如果checkSessionTime失败,则
reset Session。(reset时使⽤了urrent.automatic. AutomicBoolean 作为锁,使得reset成为⼀个原⼦操作)reset做了什么事情?⾸先回调⽤户接⼝onBeforeSessionReset,然后⽣成logout消息,并在state的中记录logoutSent的状态为true,然后关闭I/O连接,关闭I/O连接之后,设置⼀系列state的状态。最后resetState(将messageStore重置)。
3) 检查hasResponder,如果没有responder,则返回。关于responder的解释,请参考15。
4)检查isLogonReceived,只有receivedLogon才算真正处于登陆状态。如果没有收到logon响应,检查state看是否需要发送logon消息(对Initiator⽽⾔,如果还未发logon则需要发送logon消息)如果需要发送logon消息,检查此时是否可以发送logon消息(为了防⽌不断重复发送logon,对发送logon消息有频率限制),如果可以发送则回调⽤户接⼝canLogon,判断⽤户是否允许发送logon消息,如果不允许则直接返回;如果允许则generateLogon并发送。发送完毕logon消息之后,返回。
5)检查getHeartBeatInterval是否为0。看到这⾥,发现了⼀个QuickFix/J的bug,搜索了全部代码,没有发现作为客户端的Initiator在任何地⽅设置了state的heartBeatInterval。⽽作为服务器端的Acceptor在quickfix.mina.acceptor.AcceptorIoHandler.processMessage中设置了state的heartBeatInterval,代码如下:
Java代码
01. qfSession.setHeartBeatInterval(heartbeatInterval);
[java]
01. qfSession.setHeartBeatInterval(heartbeatInterval);
因此在写客户化的Initiator时,记得fix这个bug,在客户端收到了来⾃服务器端Logon消息响应之后,从中取出HeartBtInt字段的值,设置到fixSession的heartBeatInterval中去。设置可以参考上述代码。
6) 检查state isLogoutTimedOut(logout消息已经发出去,并且Session已经超时,说明客户端确实已经收到了服务器端发送的Logon消息,并且客户端已经logout了),则断开I/O连接。
7) 检查state isWithinHeartBeat。⽬前发现QuickFix/J并没有设置state.withinHeartBeat,始终使⽤的是默认初始值false,不知道为何?既然这个检查始终返回false,则不会返回,继续下⼀步检查。
8) 检查stateisTimedOut(如果距离上次收到HeartBeat已经超过了2.4*HeartBeat秒,则认为已经TimeOut)。如果已经timeOut,没有取消HeartBeatCheck的话,则断开I/O连接,然后回调⽤户的onHeartBeatTimeout接⼝。如果仍然未timeout,则根据需要发送TestRequest或者Heartbeat消息。
STEP1.0.0中的Logon消息响应中,还给出了MaxFailInt,即允许⼼跳最⼤失败次数,超过此次数之后
便认为对⽅已经超时,可以断开连接,准备重新登陆。为Session设置MaxFailInt可以将取到的MaxFailInt乘以HeartBtInt乘以1000换算成秒,然后为Session添加API,使得能够对state的isTimedOut属性进⾏操作设置,然后在在此判断timeout。
⾸先为quickfix.SessionState添加⼀个属性
Java代码
01. private long timeoutMs = 24000L;
[java]
01. private long timeoutMs = 24000L;
然后添加⼀个setTimeout⼀个getTimeout,然后为Session添加响应的setter和getter。最后在收到了Logon消息的响应时调⽤Session的setTimeout设置超时时间。
(message):
⾸先检查SessionTime,如果超过1秒未刷新则刷新时间戳;如果发现Session不存在,则执⾏reset,重置Session,并返回。
然后检查消息的header中的beginString,如果和SessionID中记录的beginString不匹配则抛异常,返回。
然后检查msgType,如果是Logon消息,根据版本的不同,设置Session中的targetDefaultApplVerID。targetDefaultApplVerID有什么⽤处呢?因为FIXT协议和之前版本的FIX协议的ApplVerID有不同的组成格式,这个targetDefaultApplVerID主要是给quickfix.MessageCracker⽤来解析FIXT消息的。
然后检查dataDictionaryProvider, 根据customApplVerID和applVerID取到数据字典(applicationDataDictionary及sessionDataDictionary,细节请参考16)。
然后根据取到的字典验证message是否合法,不合法的根据情况打印警告⽇志或者直接抛出异常退出。
然后根据消息类型,分别做相应的状态设置(通过nextLogon,nextHeartBeat,nextTestRequest,nextSequenceReset,nextLogout),然后调⽤⽤户call back API(在verify中)。处理完毕该消息之后,调⽤nextQueued()递归处理下⼀个队列中的消息。
请注意verify(message)函数,所有的普通消息通过这个函数去回调application的fromApp(message, sessionID)的。verify -> veriry -> fromCallback -> fromAdmin/fromApp。关于消息的解析,其中普通Message是通过quickfix.MessageUtils.parse将String类型的消息解析成Message。
next(message)被SingleThreadedEventHandlingStrategy或者ThreadPerSessionEventHandlingStrategy处理每⼀条收到的消息时调⽤,是处理收到消息的源头。
(string):就是next(message)的重载函数,将输⼊的字符串解析成Message,然后调⽤next(message)处理。
quickfix.Session.verify(msg, checkTooHigh, checkTooLow):收到的消息的MsgSeqNum与预期的不⼀样时所做的状态检查和操作。操作⽐如,如果发现丢包,则⾃动重发ResendRequest请求或者仅仅记录报警⽇志;如果发现重复MsgSeqNum,如果消息中设置了可能重复标志,则正常返回,否则说明消息有误,则Logout,disconnect。
quickfix.Session.verify(msg):verify(msg, checkTooHigh,checkTooLow)的重载函数。根据是否是Admin类型的消息及从配置中读取的checkGapFieldOnAdminMessage确定checkTooHigh和checkTooLow这两个标志。如果不是Admin消息则checkTooXX,如果checkGapFieldOnAdminMessage为真则checkTooXX。
quickfix.Session.doTargetTooHigh(message):收到的消息MsgSeqNum⽐预期的⼤,即丢失了消息,先检查是否需要reset或者disconnect(⾄于是否需要reset或者disconnect由创建Session时通过构造函数的参数指定,默认均是不需要,也可以通过指定配置ResetOnError和DisconnectOnError),如果需
要则reset或者disconnect后返回。如果不需要,则将这条消息⼊队(enqueue)存⼊state中,然后⽣成ResendRequest并发送出去,请求从expectedTargetNum开始到此消息前的所有消息。由此可见,在state的messageQueue中存储的消息都是因为发现消息序号跳跃缺失消息后,临时存储的跳跃之后新消息。⽽这些临时存储在state中的消息,会在收到并处理完每种消息之后都会去检查并处理,⽐如
next(msg),nextHeartBeat(msg),nextLogon(msg),nextReject(msg),nextTestRequest(msg)这些处理过程的最后都会检查并处理state临时存储的消息。
quickfix.Session.doTargetTooLow(message):收到的消息MsgSeqNum⽐预期的⼩,即收到了重复消息,应该做的事情。
quickfix.Session.parseMessage(stringData):将输⼊的字符串数据解析成Message。实际是调⽤quickfix.parse(session, stringData)解析的。
quickfix.Session.send(String messageString):真正通过底层Mina发送消息给对⽅的API。在QuickFix/J中,send(msgString)通过responder发送消息,responder最终调⽤了Mina的IoSession.write(object)将数据写⼊⽹络中。
responder是通过quickfix.Session.setResponder(Responderresponder)在quickfix.mina.initiator.InitiatorIoHandler中创建Session时设置的。关于responder的细节请参考15。
quickfix.Session.sendRaw(message, int num):
Session内部的helper⽅法,⾸先将传⼊的消息和该消息的MsgSeqNum拼装好,把消息头中与系统相关的参数设置好,参数⽐如
BeginString,SenderCompID,TargetCompID,SendingTime。
然后根据消息类型回调响应的⽤户call backAPI,如果是Admin类型的消息,回调⽤户的toAdmin,⾮Admin类型的消息,回调⽤户的toApp。如果是LOGON消息,还要对是否重置SeqNum做相应的处理(具体处理过程是:如果消息中有ResetSeqNumFlag这个字段,并且这个字段被设置true则重置。如果重置,则⾸先resetState,取到期望的seqNum设置到消息头的MsgSeqNum字段中。⽆论是否重置,都将resetSeqNumFlag写⼊state中)。
最后把拼装好的message调⽤toString转换成字符串,然后调⽤send(string)⽅法真正的将消息发送到⽹络上去。
整个处理过程需要锁定SenderMsgSeqNum,直到全部操作完成返回。因为如果call back调⽤失败,则roll back会更⾼效。
sendRaw被Session中generateXXX调⽤,⽐如
generateHeartbeat,generateLogon,generateLogout,generateReject,generateBusinessReject,generateResendRequest,generateSequenceReset,generateTestRequest。这些generateXXX中⽣成了相应消息之后,最后调⽤sendRaw把消息通过⽹络发送出去。
quickfix.Session.send(Message message):做了两件事,⾸先去掉消息头中的PossDupFlag和OrigSendingTime,然后调⽤sendRaw把消息发送到⽹络上去(设置seqNum为0)。主意send(message)返回的boolean并不能说明消息是否成功发送出去,仅仅是说明消息已经成功放⼊发送队列了,因为默认情况下QuickFix/J使⽤异步IO发送⽹络数据。
6. quickfix.mina.initiator.IoSessionInitiator:使⽤MINA提供的传输层的API,建⽴、维护同服务器之间的传输层的⽹络连接,⽽不是应⽤层的⽹络连接。这些⽹络功能都在⼀个叫做quickfix.mina.initiator.IoSessionInitiator.ConnectTask的⼀个私有的TimerTask中实现。具体实现功能有连接(包括普通连接和加密SSL连接)、重连、判断是否应该重连、处理连接异常、启动和停⽌ConnectTask。
7. quickfix.SocketInitiator:使⽤单独的线程去为所有的Session处理消息。SocketInitiator提供的功能有:
a)初始化,即⽤eventHandlingStrategy创建Initiator,然后注册此SocketInitiator所管理的全部Session,然后启动Initiator,最后调⽤eventHandlingStrategy.blockInThread()在另外的后台线程中去处理SessionTimer收到的插⼊队列的消息。启动Initiator做的事情依次是:先启动SessionTimer去监听从传输层过来的消息,如果没有Logon则先Logon,然后在收到消息后回调⽤户代码处理消息;启动reconnectTask去建⽴和维护传输层的⽹络连接。
b) 启动Initiator,在另外的线程中后台(Daemon)处理消息。
c) 阻塞Initiator,在同⼀线程中处理消息。
d) 停⽌Initiator。分为强制停⽌和⾮强制停⽌。强制或者⾮强制Logout所有FixSession,停⽌连接层的Initiator,取消注册所有此SocketInitiator所管理的全部Session。
e) 关于a) b)如何处理来⾃底层的消息的逻辑,请参考11和12。因为这⾥所谓的处理消息实际上是直接或者间接调⽤了SingleThreadedEventHandlingStrategy的block处理消息。
8. quickfix.ThreadedSocketInitiator:为每⼀个Session使⽤⼀个单独的线程去处理消息。功能参考7。除了线程⼯作模式不⼀样,功能和7完全⼀样。
9. quickfix.SessionState:Session和对⽅通信过程中使⽤的helper类。主要功能就是存储了Session的
所有状态,并且提供了响应API访问这些状态。状态包括
heartBeatInterval,heartBeatMillis,是否需要heartBeat,判断Session所在应⽤程序的⾓⾊(客户端Initiator or服务器端Acceptor),lastReceivedTime,lastSentTime,获取logger,判断作为客户端的Initiator登陆消息是否已经发出,判断作为服务器端的Acceptor登陆消息是否收到,判断是否需要登陆,判断登陆是否
TimeOut,messageStore,testRequestCounter,判断是否需要TestRequest,判断是否处于TimeOut状态,将收到的Message⼊队(enqueue),出队(dequeue),锁定和解锁发送/接受的SequenceNumber,获取、设置⾃增的下⼀个SequenceNumber,重置(即将Sequence清空,重新从1开始计数),设置、获取Logout的原因。
10. quickfix.mina.EventHandlingStrategy:⽤于不同版本FIX协议处理事件的策略的接⼝,是应⽤级处理消息回调接⼝的根源。当传输层消息到达时调⽤此接⼝onMessage,可以这么理解,onMessage是EventHandlingStrategy的输⼊,这个输⼊来⾃底层。getSessionConnector获取和这个策略相关的SessionConnector,即获取和这个Session相关的
Initiator/Acceptor去处理响应的输⼊消息,⼀般情况下是逐层将消息向上层传出,回调⽤户的函数处理该消息。getQueueSize获取当前被处理消息队列的长度。EventHandlingStrategy ⼀般作为AbstractS
ocketInitiator的成员,建⽴IoSessionInitiator时传给IoSessionInitiator,请参考3b)。⽬前在QuickFix/J中有两个具体实现,分别是
quickfix.mina.SingleThreadedEventHandlingStrategy和quickfix.mina.ThreadPerSessionEventHandlingStrategy。这两个具体实现类的说明请参考11,12。
11. quickfix.mina.SingleThreadedEventHandlingStrategy:是QuickFix/J处理消息的核⼼类。处理消息时即便有多个Session也使⽤单线程模式。
a) 为了不阻塞输⼊,那么就需要⼀个eventQueue来临时快速的存储收到的所有消息。
b) onMessage接到底层传⼊的消息包装成SessionMessageEvent,⾸先将其存⼊eventQueue。
c) 那么SessionMessageEvent⾥⾯有什么?SessionEvent仅仅是把fixSession和Message包装到⼀起,并且提供了处理Message的⽅法processMessage。可以这样理解,onMessage 是事件处理策略底层Message的输⼊,SessionEvent中的processMessage是Message的输出,未来被应⽤程序级别的函数处理。
d) getSessionConnector获取需要处理应⽤级的connector以便处理eventQueue中的消息。
e) getMessage从eventQueue中取出SessionMessageEvent待处理。
f) block就是应⽤程序级别处理消息的⼊⼝。block判断HandlingMessage是否应该继续运新,如果是则从消息队列中取出SessionMessageEvent,调⽤其中的processMessage去处理该Message。
g) processMessage如何处理了收到的消息呢?它会调⽤fixSession的next⽅法,将消息传给Session,由fixSession再接⼒将消息回调到⽤户⼿中。请参考5。
h)也许你会注意到处理消息的block在run中始终被调⽤,⽽且没有任何sleep时间,难道它在没有消息的时候始终不停的死循环运⾏且丝毫不休息?CPU会保持100%?实际上效果不是这样的,其中的秘密在于它使⽤了BlockingQueue做到了和sleep相同效果的事情。在没有消息的时候,这个循环会每休息⼀秒再执⾏下⼀次循环。如何做到这样的效果呢?原因是如果eventQueue中如果没有消息,⽽该eventQueue设置了阻塞超时1000毫秒,则取消息的操作会等待最多1000毫秒,如果没有等到消息则超时退出不再等待,执⾏完毕本次循环,如果等到了则按照正常流程处理消息。这样做最⼤的好处就是,如果eventQueue中有事件,那么就会连续不断的处理,如果没有消息,就会休息timeout毫秒再查看。
i) blockInThread,在新启动的后台线程中处理SessionMessageEvent
12. quickfix.mina.ThreadPerSessionEventHandlingStrategy:同样是QuickFix/J处理消息的核⼼类。和单线程模式不同的是该策略会为每个session启动⼀个新的线程去处理消息。
a) 由于是每个Session对应⼀个线程,因此该策略内部需要⼀个称之为dispatchersMap作为缓存为每个Session保存响应的处理线程(MessageDispatchingThread)引⽤。
b) 当onMessage收到来⾃底层的输⼊消息时,根据输⼊的fixSession从dispatchers中取到相应的处理线程,并将该消息加⼊(enqueue)到该线程内部的消息队列中待处理。
c)每个dispatcher(MessageDispatchingThread类型)内部均维护了⾃⼰的消息队列,和单线程模式不同在于,消息队列中的消息仅仅是Message,不是SessionMessageEvent。处理Message的逻辑从单线程中的SessionMessageEvent中移出到dispatcher中。
13. quickfix.DataDictionaryProvider:是⼀个接⼝,为指定的sessionprotocol或者applicationversion提供数据字典。getSessionDataDictionary根据提供的beginString即协议版本获取相应的数据字典。getApplicationDataDictionary根据提供的application versionID和custom application ID获取数据字典。application versionID在FIXT.1.1之前由BeginString字段确定。custom application ID是可选值,不是必须的。
14. quickfix.DefaultDataDictionaryProvider:是QuickFix/J提供的DataDictionaryProvider的默认实现。在DefaultDataDictionaryProvider中,有两种数据字典,⼀种是传输⽤的数据字典,⼀种是应⽤程序⽤的数据字典,分别缓存在两个Map中。这个DefaultDataDictionaryProvider是在创建Session时由
默认的DefaultSessionFactory根据beginString创建的。addApplicationDictionary和addTransportDictionary分别⽤于向DefaultDataDictionaryProvider添加新的数据字典。⽬前QuickFix/J的实现中,在DefaultSessionFactory中初始化Session 时添加字典。对于fixt之前版本的数据字典,每个数据字典会被同时添加进⼊到传输数据字典和应⽤程序数据字典中。
15. quickfix.Responder:这是个接⼝,供Session使⽤,⽤于发送原始FIX消息(raw FIXmessage)或者切断和对⽅的I/O连接。send(stringData)发送原始的FIX消息,disconnect()切断底层的连接,getRemoteIPAddress()提供对⽅的IP。⽬前的QuickFix/J中quickfix.mina.IoSessionResponder实现了Responder接⼝。IoSessionResponder.send(string)调⽤了Mina的IoSession.write(object)将数据写⼊⽹络中。默认是异步写⼊。(如果同步写⼊在创建responder时显⽰传⼊synchronousWrites为true。同步写⼊实际是write(object)之后,取到
ioSession的WriteFuture,然后向WriteFuture发送join指令。)
16. quickfix.DataDictionary:为不同版本的FIX协议提供消息的元数据(metadata),提供helper⽅法帮助判断field类型,判断消息的类型,判断组、组件的类型,检查message、field,field等。
总共有3种⽅式⽣成DataDictionary,⾸先可以通过⼀个系统路径读⼊配置⽂件⽣成,或者载⼊inputStream⽣成,或者从已有的DataDictionary拷贝⽣成。
DataDictionary中最复杂的⽅法就是load,思想是根据l中的定义,判断xml⽂件是否合法,然后根据DTD取到各个section中的信息并存⼊预先定义好的Map、List、Set中,便于之后的Helper⽅法能够快速准确的搜索和判断相关信息。
17. quickfix.CompositeLog:将⽇志信息输出到多个logger中。多个logger在newCompositeLog(Log[]logs)时传⼊。但是多个logger没有优先级,也不能控制多个logger,仅仅是⽇志来了,同时写⼊这多个logger中。CompositeLog由CompositeLogFactory创建。
QuickFix/J中其他的logger还有:
quickfix.ScreenLog,将⽇志通过System.out打印到屏幕,相应的有quickfix.ScreenLogFactory。
quickfix.FileLog,将⽇志写⼊本地⽂件中。相应的有quickfix.FileLogFactory。
quickfix.JdbcLog,将⽇志写⼊数据库中。相应的有quickfix.JdbcLogFactory。
quickfix.SLF4JLog,使⽤SLF4J wrapper写⽇志,SLF4J⽀持JDK logging,log4j等。
另外多数Log的实现都extends了quickfix.AbstractLog。AbstractLog的作⽤是添加了对HeartBeat是否记录⽇志的开关。
B). ⽹络数据在QuickFix/J中的流向
ConnectTask -&t(sockAddress, ioHandler) ->MINA建⽴和服务器端的通信。收到⽹络数据,ioConnector触发相应事件,并把事件交给ioHandler(InitiatorIoHandler)的processMessage ->processMessage中调⽤Message(quickfixSession,message) ,将消息向外回调 -
&Message(quickfixSession, message)将收到的消息⼊队(enQueue)到eventQueue ->SingleThreadedEventHandlingStrategy.blockInThread 中启动单独的后台线程,依次从eventQueue取出消息处理,向session回调 ->(message) -&根据msgType判断回调 ->逐层回调 (verify -> veriry ->fromCallback -> fromAdmin/fromApp),从fromAdmin/fromApp(msg,sessionID)回调⽤户处理逻辑。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论