基于ActiveMQ通信的多租户任务调度框架设计与实现rabbitmq rocketmq kafka区别
作者:苏志宏
来源:《电脑知识与技术》2020年第18期
        摘要:任务调度被广泛应用于软件应用系统中,为了降低耦合度,解决多租户任务调度系统中任务调度和执行结果的及时通知问题,提出了在任务调度框架中应用ActiveMQ中间件构建消息总线进行消息传输和通信的技术思路。内容包括Ac-tiveMQ介绍、主流消息中间件比较、基于ActiveMQ通信的多租户调度框架构建以及框架实现。
        关键词:多租户;ActiveMQ;任务调度;消息中间件;低耦合
        中图分类号:TP311 文献标识码:A
        文章编号:1009-3044(2020)18-0007-03
        开放科学(资源服务)标识码(OSID):
        1 背景
        在数据传输及解译、消息通知、可执行程序调度等业务应用场景中,往往需要系统在指定时间进行特定操作,基于此类需求,任务调度框架被广泛应用于软件应用系统中。文献[1-3]介绍了部分任务调度算法及任务调度框架构建方法。但任务调度框架仅关注任务的调度及触发,对任务执行过程、执行结果信息的及时通知能力较为欠缺,尤其是在多并发、异步任务执行时,出现任务执行超时、中断、异常的情况下,无法及时将任务执行信息通知主系统进行自动处理,为调度系统的可靠性和可伸缩性带来较大弊端,严重影响业务系统连续运行,增加了人力资源成本和维护成本。
        通过对ActiveMQ的研究与应用,解决多任务调度框架中出现的上述问题,通过ActiveMQ构建消息总线,完成系统内部通信及系统内外通信交互,降低系统耦合度,提高系统任务调度执行的可靠性。
        2 ActiveMQ概述
        ActiveMQ是由Apache基金会提供的基于Java语言开发的多协议开源消息中间件,支持OpenWire、Stomp、REST、WSNoti-fication、XMPP、AMQP等多种应用协议,可在Java、C、C++、C#、Rubv、Perl、Pvthon、PHP、等多种开发语言环境用使用,提供交叉语言功能,并可跨平台应用。目前ActiveMQ已经支持MQTT(消息队列遥测传输)协议,可应用于物联网设备。其基于JMS规范提供了多种消息类型的传输,包括无有效负载消息Message、序列化对象消息ObjectMessage、文本消息TextMes-sage、键值集消息MapMessage、字节消息BytesMessage、数据流消息StreamMessage等,基本满足系统常用消息通信类型要求,同时对JDBC和Journal的支持也满足了消息持久化的要求[4]。
        ActiveMQ消息通讯有两种模式:主题(Topic)模式、队列( Queue)模式。主题模式,亦可称为发布/订阅模式,这种模式下一个消息可以被多个订阅者接收,订阅者订阅一个
主题后,只能接收到自其订阅之后发布的信息。订阅者如果在发布者发送消息之后启动,是无法接收到已经发送的消息的,除非发布者已经对消息进行了持久化存储。队列模式中,ActiveMQ构建一个消息存储队列,多个发送端可同时向队列中发送消息,队列被多个接收端同时监听,但一个消息只能被一个接收端获取,一旦被获取就会消失,如果没有被获取,则会一直等待。
        ActiveMQ是站在开发者的角度被设计的,可以通过JCon-sole和ActiveMQ自带的WebConsole工具等形式来监控不同层面的数据,可通过配置的方式快速集成到Spring框架中,对二次开发和调试十分友好。
        3 ActiveMQ框架及与主流消息中间件的对比
        3.1 ActiveMQ框架介绍
        ActiveMQ主框架部分从结构上可大致分为协议连接域、消息域、信息存储域和网络服务域几部分,总体架构如图1所示。
        其中协议连接域( Connectors)封装各类通信协议,用于创建和管理与代理通信客户
端的连接;
        消息域(图1 TopicRegion和Quene Region部分)封装了Ac-tiveMQ的核心内容,除主题模式和队列模式两种消息通信模式相关实现外,其最主要部分为Transports,包括Transport,Trans-portServer和TransportFactory等;
        消息存储域( Message Store)定义了信息的缓存或持久化操作相关内容,持久化操作方面支持AMQ Message Store、Ka-haDB、JDBC、LeveIDB及Journal,目前ActiveMQ的默认持久化存储为AMQ Message Store;
        网络服务域( Network Services)定义了用于支持代理的网络服务组件,包括远程服务检测与发现、消息存储和转发、DR集等。
        3.2 主流消息中间件对比
        目前消息中间件较多,且各具优势,比价突出的有Ac-tiveMQ、RabbitMQ、Kafka、RocketMQ,ZeroMQ等,在實际应用过程中应根据实际需求的优先级进行选型,下面给出部分消息中间件的对比。
        4 多租户调度框架中ActiveMQ应用
        4.1 多租户调度框架设计
        基于ActiveMQ通信的多租户调度框架可分为租户应用、数据存储、核心服务及业务操作四个主要部分,整体框架设计如图2所示。
        其中租户应用为用户可视化操作界面,用于定义和操作调度业务,查看与监控作业执行状态等。数据存储部分将数据进行分区存储,分为消息通信数据、任务调度数据、业务支撑数据。所有租户操作均以消息形式通知ActiveMQ消息总线[6],为防止任务被重复执行,系统通过队列(Queue)模式向任务驱动模块传输信息,任务调度端消息代理获取到租户信息后,进行指定任务定义并根据任务触发器定义信息进行立即执行或在指定时间执行,各业务操作模块均可使用独立线程执行,提高调度效率,降低系统耦合。任务执行完成后,将执行信息和结果写回消息队列,客户端获取后刷新状态列表。
        对于系统租户使用者,可通过操作界面直接调用已经定义好的任务,通过任务开放式接口进行参数设置。对于二次开发者,可遵循顶层任务接口规范,自定义任务插件逻辑和处理流程,测试后上传至任务插件库自动加载供租户使用。
        4.2 框架实现
        框架实现的核心是调度和通信。通过租户ID与任务的关联,对任务加以区分和控制,系统调度部分采用Quartz中间件[7],租户可通过可视化界面进行Quartz Job的定义与驱动[8],可根据业务需要扩展定义新的任务,在扩展定义时需遵循Quartz框架要求继承和实现Job类,为了实现系统任务的动态加载和调度,在核心服务层定义了通用Job创建工厂类,通过反射模式创建任务。核心代码如下:
        Class jobClass=new CascadingClassLoad-HelperO.loadClass(UobClassName0, Job.class);//动态加载任务
        JobDetail j obDetail= wjob(j obClass).withldenti-ty(tempjob. getjobKey0, tempjob. getGroupKey0). withDescription(jobName O).build0;//创建任务
        Scheduler scheduler= Scheduler0;
        scheduler.schedulejob(igger);//执行任务
        上述代码中,tempjob为反序列化的任务属性信息,由租户定义并经消息总线传输至服务层进行处理,传输采用序列化文本信息。租户应用端与任务端均具备消息的接收与发送能力,其中租户应用端发送任务定义数据,接收任务执行状态和结果信息,而任务端与之相反。ActiveMQ传输部分核心代码如下:
        Templatejob tempjob=new Templatejob0. setName(”job”).setGroup("group”).setTrigger(”25”);//定义任务模板对象
        TextMessage ateTextMessage(String0);//创建序列化任务消息对象
        producer.send(message);//发送消息

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。