IBM-MQ原理及使⽤场景
⼀、MQ简介及特点
MQ全称为Message Queue, 消息队列(MQ)是⼀种应⽤程序对应⽤程序的通信⽅法。应⽤程序通过写和检索出⼊列队的针对应⽤程序的数据(消息)来通信,⽽⽆需专⽤连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进⾏通信,⽽不是通过直接调⽤彼此来通信,直接调⽤通常是⽤于诸如远程过程调⽤的技术。排队指的是应⽤程序通过队列来通信。队列的使⽤除去了接收和发送应⽤程序同时执⾏的要求。其中较为成熟的MQ产品有IBM WebSphere MQ、RabbitMQ 、ZeroMQ 、ActiveMQ、Redis(当做⼀个轻量级的队列服务来使⽤)、Kafka、RocketMQ。它们之间的⽐较详见:。
特点:MQ的消费-⽣产者模型的⼀个典型的代表,⼀端往消息队列中不断的写⼊消息,⽽另⼀端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN 消息中间件服务的⼀个标准和API定义,⽽MQ则是遵循了AMQP协议的具体实现和产品。
以下主要介绍—IBM WebSphere MQ
⼆、MQ使⽤场景
1、异步通信
有些业务不想也不需要⽴即处理消息。消息队列提供了异步处理机制,允许⽤户把⼀个消息放⼊队列,但并不⽴即处理它。想向队列中放⼊多少消息就放多少,然后在需要的时候再去处理它们。
2、解耦
降低⼯程间的强依赖程度,针对异构系统进⾏适配。在项⽬启动之初来预测将来项⽬会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插⼊了⼀个隐含的、基于数据的接⼝层,两边的处理过程都要实现这⼀接⼝,当应⽤发⽣变化时,可以独⽴的扩展或修改两边的处理过程,只要确保它们遵守同样的接⼝约束
3、冗余
有些情况下,处理数据的过程会失败。除⾮数据被持久化,否则将造成丢失。消息队列把数据进⾏持久化直到它们已经被完全处理,通过这⼀⽅式规避了数据丢失风险。许多消息队列所采⽤的"插⼊-获取-删除"范式中,在把⼀个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从⽽确保你的数据被安全的保存直到你使⽤完毕。
4、扩展性
因为消息队列解耦了你的处理过程,所以增⼤消息⼊队和处理的频率是很容易的,只要另外增加处理
过程即可。不需要改变代码、不需要调节参数。便于分布式扩容
5、过载保护
在访问量剧增的情况下,应⽤仍然需要继续发挥作⽤,但是这样的突发流量⽆法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投⼊资源随时待命⽆疑是巨⼤的浪费。使⽤消息队列能够使关键组件顶住突发的访问压⼒,⽽不会因为突发的超负荷的请求⽽完全崩溃
6、可恢复性
系统的⼀部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使⼀个处理消息的进程挂掉,加⼊队列中的消息仍然可以在系统恢复后被处理。
7、顺序保证
在⼤多使⽤场景下,数据处理的顺序都很重要。⼤部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
8、缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过⼀个缓冲层来帮助任务最⾼效率的执⾏,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。 9、数据流处理 分布式系统产⽣的海量数据流,如:业务⽇志、监控数据、⽤户⾏为等,针对这些数据流进⾏实时或批量采集汇总,然后进⾏⼤数据分析是当前互联⽹的必备技术,通过消息队列完成此类数据收集是最好的选择
三、MQ原理
1、MQ原型
Pub/Sub发布订阅(⼴播):使⽤topic作为通信载体
PTP点对点:使⽤queue作为通信载体
2、MQ组成
Broker:消息服务器,作为server提供消息核⼼服务
Producer:消息⽣产者,业务的发起⽅,负责⽣产消息传输给broker,
Consumer:消息消费者,业务的处理⽅,负责从broker获取消息并进⾏业务逻辑处理
Topic:主题,发布订阅模式下的消息统⼀汇集地,不同⽣产者向topic发送消息,由MQ服务器分发到不同的订阅 者,实现消息的⼴播
Queue:队列,PTP模式下,特定⽣产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
Message:消息体,根据不同通信协议定义的固定格式进⾏编码的数据包,来封装业务数据,实现消息的传输
3、MQ常⽤协议
AMQP协议 AMQP即Advanced Message Queuing Protocol,⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语⾔等条件的限制。
优点:可靠、通⽤
MQTT协议 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的⼀个即时通讯协议,有可能成为物联⽹的重要组成部分。该协议⽀持所有平台,⼏乎可以把所有联⽹物品和外部连接起来,被⽤来当做传感器和致动器(⽐如通过Twitter让房屋联⽹)的通信协议。 优点:格
式简洁、占⽤带宽⼩、移动端通信、PUSH、嵌⼊式系统
STOMP协议 STOMP(Streaming Text Orientated Message Protocol)是流⽂本定向消息协议,是⼀种为MOM(Message Oriented Middleware,⾯向消息的中间件)设计的简单⽂本协议。STOMP提供⼀个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进⾏交互。 优点:命令模式(⾮topic\queue模式)
XMPP协议 XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语⾔(XML)的协议,多⽤于即时消息(IM)以及在线现场探测。适⽤于服务器之间的准即时操作。核⼼是基于XML流传输,这个协议可能最终允许因特⽹⽤户向因特⽹上的其他任何⼈发送即时消息,即使其操作系统和浏览器不同。 优点:通⽤公开、兼容性强、可扩展、安全性⾼,但XML 编码格式占⽤带宽⼤
其他基于TCP/IP⾃定义的协议 有些特殊框架(如:redis、kafka、zeroMq等)根据⾃⾝需要未严格遵循MQ规范,⽽是基于TCP\IP⾃⾏封装了⼀套协议,通过⽹络socket接⼝进⾏传输,实现了MQ的功能。
四、MQ的通讯模式
1) 点对点通讯:点对点⽅式是最为传统和常见的通讯⽅式,它⽀持⼀对⼀、⼀对多、多对多、多对⼀等多种配置⽅式,⽀持树状、⽹状等多种拓扑结构。
2) 多点⼴播:MQ适⽤于不同类型的应⽤。其中重要的,也是正在发展中的是"多点⼴播"应⽤,即能够将消息发送到多个⽬标站点(Destination List)。可以使⽤⼀条MQ指令将单⼀消息发送到多个⽬标站点,并确保为每⼀站点可靠地提供信息。MQ不仅提供了多点⼴播的功能,⽽且还拥有智能消息分发功能,在将⼀条消息发送到同⼀系统上的多个⽤户时,MQ将消息的⼀个复制版本和该系统上接收者的名单发送到⽬标MQ系统。⽬标MQ系统在本地复制这些消息,并将它们发送到名单上的队列,从⽽尽可能减少⽹络的传输量。
3) 发布/订阅(Publish/Subscribe)模式:发布/订阅功能使消息的分发可以突破⽬的队列地理指向的限制,使消息按照特定的主题甚⾄内容进⾏分发,⽤户或应⽤程序可以根据主题或内容接收到所需要的消息。发布/订阅功能使得发送者和接收者之间的耦合关系变得更为松散,发送者不必关⼼接收者的⽬的地址,⽽接收者也不必关⼼消息的发送地址,⽽只是根据消息的主题进⾏消息的收发。在MQ家族产品中,MQ Event Broker是专门⽤于使⽤发布/订阅技术进⾏数据通讯的产品,它⽀持基于队列和直接基于TCP/IP两种⽅式的发布和订阅。
4) 集(Cluster):为了简化点对点通讯模式中的系统配置,MQ提供Cluster(集)的解决⽅案。集类
似于⼀个域(Domain),集内部的队列管理器之间通讯时,不需要两两之间建⽴消息通道,⽽是采⽤集(Cluster)通道与其它成员通讯,从⽽⼤⼤简化了系统配置。此外,集中的队列管理器之间能够⾃动进⾏负载均衡,当某⼀队列管理器出现故障时,其它队列管理器可以接管它的⼯作,从⽽⼤⼤提⾼系统的⾼可靠性
五、IBM WebSphere MQ概念介绍(服务安装略)
注:服务端安装完成后,创建列管理器时出错:AMQ8135:未授权,表明必须要⽤adminstrator管理员帐号创建运⾏。
我们把队列管理器⽐作是数据库,那么队列就是其中的⼀张表,消息就是表中的⼀条记录。
1. 队列:我们可以简单地把队列看成⼀个容器,⽤于存放消息。
2. 队列管理器:队列管理器构建了独⽴的 MQ 的运⾏环境,它是消息队列的管理者,⽤来维护和管理消息队列。
3. 消息:MQ中的最⼩对象;默认情况下,消息缺省可以达到 4MB。消息可以分成持久消息和⾮持久消息。所 谓“持久”的 意思,就是
在MQ 队列管理器重启动后,消息是否仍然能保持。持久的消息写⼊或读出队列的同时会在 Log 中记录,所以性能上⽐⾮持久消息差不少。
4. 通道:通道则是两个队列管理器之间的⼀种单向的点对点的通信连接, 消息在通道中只能单向流动。队列管理器之间的通信是通过配
置通道来实现 的,通道两侧的队列管理器对这个通道的相关参数应该能对应起来。在通道上可以配置不同的通信协议,这样就使得编程接⼝与通信协议⽆关。通道两端的 配置必须匹配, 且名字相同,否则⽆法连通。
5. RUNMQSC:命令⾏交互界⾯管理⼯具;作为维护⼈员的我们,与MQ打交道有两种⽅式,⼀种是通过MQ提供的⼆进制命令⼯具
(在mq安装⽬录的bin⽬录下),另⼀种⽅式则是通过命令⾏交互管理⼯具;这⼆者在功能上有很多是重合的,但并⾮完全可替代;
RUNMQSC是⼀个通⽤的 MQ 对象管理⼯具,使⽤MQSC命令集可以对 MQ 对象进⾏;全⽅位的管理,也是各种管理⽅式最直接、最全⾯的⼀种。RUNMQSC 运⾏的命令集称MQSC (MQ Script Command)";在 RUNMQSC 中⼤⼩写⽆关,所有的命令会先转换成全⼤写再提交执⾏。所以如果要 表⽰⼤⼩相关的字串,⽐如对象名,则⽤引号将字串包住。输⼊以下命令启动MQSC命令:
runmqsc[queueManagerName]
六、MQ的核⼼API
1、MQQueueManager―――队列管理器访问类
常⽤⽅法:
public MQQueueManager(String queueManagerName)―――建⽴⼀个管理器实例
创建队列管理器有两种⽅式:
1:绑定⽅式,这种⽅式要求MQ服务器与应⽤程序同属⼀台服务器,效率⽐较⾼。
2:客户机⽅式:这种⽅式应⽤程序和MQ服务器可以不在同⼀台服务器上,但是要考虑到MQ权限的问题,尤其是MQ7.5之后,权限变的很复杂,慎重对待。
注:如果使⽤绑定的⽅式则可以直接创建⼀个新的队列管理器实例。但是在某些平台下这样直接创建会出错,必须采⽤MQClient的⽅式进⾏连接。此时需要先定义服务通道,端⼝,服务名等环境变量,再创建⼀个队列管理器实例。如:
[html]
1. MQEnvironment.hostname = "IP地址";
2. MQEnvironment.channel = "channelname";
3. MQEnvironment.port = port ;
4. MQEnvironment.CCSID = CCSID;
5. MQQueueManager qMgr = new MQQueueManager(hostname);
其中hostname表⽰队列管理器所在的机器地址(⼀般在本地则填localhost或127.0.0.1)。
port就是队列管理器的侦听端⼝。
Channel定义访问的服务器通道名(需要⾃⼰在队列管理器中先定义,其⽅法类似与⼀般通道的定义,不过类型是服务器通道)
public bool isConnected()―――返回队列管理器是否在连接状态
public synchronized void disconnect()―――断开队列管理器的连接
2、MQQueue―――队列访问类
常⽤⽅法:
通常MQQueue实例的⽣成通过调⽤MQQueueManager的accessQueue⽅法类实例化。
public synchornized MQQueue accessQueue(String QueueName,int openOptions)―――返回⼀个连接队列的实例(类名为:MQQueue)
常⽤⽅法:
public synchronized void get(MQMessage message,MQGetMessageOptions gmo)―――从队列管理器读取⼀条
消息通过message实例返回。MQGetMessageOptions的⽤法下⾯再详述。
public synchronized void put(MQMessage message,MQPutMessageOptions pmo)―――往队列管理器放⼊⼀条
消息 MQPutMessageOptions的⽤法下⾯再详述
public synchronized void close()―――关闭队列的连接
3.openOptions---队列的打开⽅式
常⽤值有:
MQC.MQOO_FAIL_IF_QUIESCING―――如果队列管理器停⽌则返回失败
MQC.MQOO_OUTPUT――――以写⽅式打开队列
MQC.MQOO_INPUT_AS_Q_DEF―――以队列默认读取⽅式打开队列 使⽤的时候可以采⽤与操作来实现多种打开队列⽅式,
MQC.MQOO_BROWSE;――――以浏览⽅式打开队列
MQC.MQGMO_BROWSE_NEXT; ――――浏览下⼀个消息
如:
int openOptions = MQC.MQOO_FAIL_IF_QUIESCING | MQC.MQOO_OUTPUT | MQC.MQOO_INPUT_AS_Q_DEF; 表⽰以读、写⽅式打开队列
4.MQMessage―――消息操作类常⽤⽅法:
public MQMessage()―――默认构造函数
public int getDataLength()―――返回可读取的消息的长度(以byte作为单位)
public void readFully(byte b[])―――读取消息到数组b中,长度以b的数组长度为准
属性:
format = MQC.MQFMT_STRING;
msg.characterSet = ccsid;//字符集
⼀般读取消息的操作为:
[html]
嵌入式系统是什么意思1. MQMessage message = new MQMessage(); ….
2. int length = DataLength();
3. byte buffer[] = new byte[length];
4. adFully(buffer);
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论