Activemq配置说明
1 概述以及activemq集方案选择 1
1.1 consumers的集 1
1.2 brokers的集 1
1.3 集方案的选择 2
2 服务端配置activemq配置 2
2.1 服务端配置l 2
2.2 客户端配置l 2
3 测试 3
3.1 采用单机方式,同步方式 3
3.2 采用单机方式,异步方式 4
4 待解决的问题 5
1 概述以及activemq集方案选择
1.1 consumers的集
ActiveMQ支持订阅同一个queue的consumers上的集。如果一个consumer失效,那么所有未被确认(unacknowledged)的消息都会被发送到这个queue上其它的consumers。如果某个consumer的处理速度比其它consumers更快,那么这个consumer就会消费更多的消息。
1.2 brokers的集
在一个网络内运行多个brokers或者stand alone brokers时存在一个问题,这就是消息在物理上只被一个broker持有,因此当某个broker失效,那么你只能等待直到它重启后,这个broker上的消息才能够被继续发送(如果没有设置持久化,那么在这种情况下,消息将会丢失)。
1.3 集方案的选择
经筛选我们采用主--从方式,Master Slave 背后的想法是,消息被复制到slave broker,因此即使master broker遇到了像硬件故障之类的错误,你也可以立即切换到slave broker而不丢失任何消息。Master Slave是目前ActiveMQ推荐的高可靠性和容错的解决方案。并且我们采用文件系统做持久化,即:Shared File System Master Slave。可以运行多个broker,这些broker共享数据目录。当第一个broker得到文件上的排他锁之后,其它的broker便会在循环中等待获得这把锁。
客户端使用failover transport来连接到可用的broker。 当master broker失效的时候会释放这把锁,这时候其中一个slave broker会得到这把锁从而成为master broker。
方案选择:服务端(Shared File System Master Slave)+ 客户端(failover transport)
2 服务端配置activemq配置
2.1 服务端配置l
路径:(%activemq_home%/l)
Master端不需要特殊配置,主要是Slave端的配置如下:
activemq使用场景 <broker xmlns="/schema/core" persistent="true" masterConnectorURI="tcp://172.16.48.100:61616" shutdownOnMasterFailure="false" schedulerSupport="false" brokerName="slave" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true">
<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://172.16.48.102:61616"/>
</transportConnectors>
注意:
1. <!-- persistent="true"表示要持久化存储消息,和子元素persistenceAdapter结合使用 -->
2. <!-- brokerName 设置broker的name,在注意在网络上必须是唯一的-->
3. <!-- persistenceAdapter消息持久化方式 -->
4. <!-- masterConnectorURI用于指向master broker -->
5. <!-- shutdownOnMasterFailure用于指定slave broker在master broker失效时是否需要停止-->
6. <!-- <kahaDB directory="${activemq.base}/data/kahadb"/>持久化数据存储目录-->
7. <!-- uri="tcp://172.16.48.102:61616" -->
2.2 客户端配置l
ActiveMQ目前支持的transport有:VM、TCP、UDP、Peer、Multicast、HTTP、HTTPS
、Failover等。
这里我们使用Failover Transport,它是一种重新连接的机制,它工作于其它transport的上层,用于建立可靠的传输。它的配置语法允许制定意多个复合的URI.会自动选择其中的一个URI来尝试建立连接,如果没有成功,那么会选择一个其它的URI来建立一个新的连接,如下代码片段所示:
<bean id="queueJmsConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="failover:(tcp://172.16.48.100:61616,tcp://172.16.48.102:61616)?randomize=false&wireFormat.maxInactivityDuration=0&maxReconnectDelay=10000" />
<property name="useAsyncSend" value="true" />
</bean>
注意:
1. <!-- failover 失效备援-->
2. <!-- randomize=false设置就可以让客户总是首先尝试连接master broker(slave broker并不会接受任何连接,直到它成为了master broker)-->
3. <!-- maxInactivityDuration 允许最大静止(消息服务器无消息)时间-->
4. <!-- maxReconnectDelay 最大重连间隔-->
5. 这些参数的具体使用根据实际需要而定
3 测试
3.1 采用单机方式,同步方式
设置: <property name="useAsyncSend" value="false"/>,
场景描述: 三个线程,每个线程生产1000万个消息,并开启一个消息者进行消费
结果:
可以存大概280w条数据,存储大概是1g硬盘+20mb内存;用时大既是十几个小时(起初平均每秒100条数据甚至更快,后来速度放慢所以致使整个处理过程耗时较长)。
分析: 从日志及配置分析可知,吞吐量及处理速度是由如下配置所致:
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
<storeUsage>
<storeUsage limit="1 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="100 mb"/>
</tempUsage>
</systemUsage>
</systemUsage>
注意: SystemUsage配置设置了一些系统内存和硬盘容量,当系统消耗超过这些容量设置时,amq会放慢生产,当无空间可用,且继续生产是等待时间过长后最终导致了一个异常:Software caused connection abort: recv failed,产生原因是在服务端/客户端单方面关闭连接的情况下,另一方依然以为tcp连接仍然建立,试图读取对方的响应数据。
3.2 采用单机方式,异步方式
设置: <property name="useAsyncSend" value="true"/>,
场景描述:
三个线程,每个线程生产1000个消息,并开启一个消费者进行消费,模拟每次消费耗费60秒时间,中途拔除网线模拟断网较长时间后重新连接上,导致数据丢失,且生产者、消费者报出连接超时异常,此时再打开一个生产者生产1000条数据,只有部分数据(300多条)被消费,造成消费不被消费。
3. 采用集方式,用文件持久化存储,客户端用failover传输,可以解决broker服务端宕机问题,实现错误转移,并且宕机没消费完的消息,可以继续被消息。
综上所述结论如下:
消息生产者使用持久(persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到 broker 发送一个确认消息给生产者,这个确认消息暗示生产者 broker 已经成功地将它发送的消息路由到目标目的并把消息保存到二级存储中。这个过程通常称为同步发送。但有一个例外,当发送方法在一个事物上下文中时,被阻塞的是 commit 方法而不是 send 方法。commit 方法成功返回意味着所有的持久消息都以被写到二级存储中。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论