Activemq配置详解与性能优化
Activemq配置详解与性能优化
配置
<beans
xmlns="/schema/beans"
xsi="/2001/XMLSchema-instance"
schemaLocation="/schema/beans /schema/beans/spring-beans.xsd
/schema/core /schema/core/activemq-core.xsd">
<beans
xmlns="/schema/beans"
xsi="/2001/XMLSchema-instance"
schemaLocation="/schema/beans /schema/beans/spring-beans.xsd
/schema/core /schema/core/activemq-core.xsd">
<broker
xmlns="/schema/core"
brokerName="127.0.0.1"
dataDirectory="${activemq.data}">
<!--
⽬的地策略
policyEntry节点:
topic:匹配的主题,⾃定义,可以使⽤wildcards(/wildcards.html)
producerFlowControl:是否对producer进⾏控制,如果你对⾃⼰ActiveMQ服务端的底层性能和消费者端的性能⾜够⾃信的话,可以设置为false,如果不是那么⾃信,请将其设为true,同时设置memoryLimit来限制队列使⽤内存的⼤⼩memoryLimit:队列可使⽤内存上限
-->
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- 订阅/发布-->
<policyEntry topic=">"producerFlowControl="true"optimizedDispatch="true"memoryLimit="16mb">
<!--
消息限制策略,⾯向Slow Consumer的
此策略只对Topic有效,只对nondurable订阅者有效,当通道中有⼤量的消息积压时,broker可以保留
的消息量。
为了防⽌Topic中有慢速消费者,导致整个通道消息积压。(对于Topic⽽⾔,⼀条消息只有所有的订阅者都消费才会被删除)
-->
<pendingMessageLimitStrategy>
<!--
ConstantPendingMessageLimitStrategy: 保留固定条数的消息,如果消息量超过limit,将使⽤“MessageEvictionStrategy”移除消息PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍数条消息。
-->
<!-- 如果prefetchSize为100,则保留10 * 100条消息 -->
<prefetchRatePendingMessageLimitStrategy multiplier="10"/>
</pendingMessageLimitStrategy>
<!--
消息剔除策略
⾯向Slow Consumer的配合PendingMessageLimitStrategy,只对Topic有效,只对nondurable订阅者有效。当PendingMessage的数量超过限制时,broker该如何剔除多余的消息。当Topic接收到信息消息后,会将消息“Copy”给每个订阅者,在保存这个消息时(保存策略"PendingSubscriberMessageStoragePolicy"),将会检测pendingMessages的数量是否超过限制(由"PendingMessageLimitStrategy"来检测),如果超过限制,将会在pendingMessages中使⽤MessageEvication Strategy移除多余的消息,此后将新消息保存在PendingMessages中。
-->
<messageEvictionStrategy>
<!--
OldestMessageEvictionStrategy: 移除旧消息,默认策略。
OldestMessageWithLowestPriorityEvictionStrategy: 旧数据中权重较低的消息,将会被移除。
UniquePropertyMessageEvictionStrategy: 移除具有指定property的旧消息。
开发者可以指定property的名称,从此属性值相同的消息列表中移除较旧的(根据消息的创建时间)。
-->
<OldestMessageWithLowestPriorityEvictionStrategy />
</messageEvictionStrategy>
<!--
慢速消费者策略 Broker将如何处理慢消费者。
Broker将会启动⼀个后台线程⽤来检测所有的慢速消费者,并定期关闭关闭它们。
-->
<slowConsumerStrategy>
<!--
AbortSlowConsumerStrategy: 中断慢速消费者,慢速消费将会被关闭。abortConnection是否关闭连接
AbortSlowConsumerStrategy:
如果慢速消费者最后⼀个ACK距离现在的时间间隔超过阀maxTimeSinceLastAck,则中断慢速消费者。
-->
<abortSlowConsumerStrategy abortConnection="false"/><!-- 不关闭底层链接 -->
</slowConsumerStrategy>
<!--转发策略将消息转发给消费者的⽅式-->
<dispatchPolicy>
<!--
RoundRobinDispatchPolicy: “轮询”,消息将依次发送给每个“订阅者”。“订阅者”列表默认按照订阅的先后顺序排列,在转发消息时,对于匹配消息的第⼀个订阅者,将会被移动到“订阅者”列表的尾部,这也意味着“下⼀条”消息,将会较晚的转发给它。
StrictOrderDispatchPolicy: 严格有序,消息依次发送给每个订阅者,按照“订阅者”订阅的时间先后。
它和RoundRobin最⼤的区别是,没有移动“订阅者”顺序的操作。
PriorityDispatchPolicy: 基于“property”权重对“订阅者”排序。它要求开发者⾸先需要对每个订阅者指定priority,默认每个consumer的权重都⼀样。SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。其中PriorityDispatchPolicy是其⼦类。
-->
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<!--恢复策略 ActiveMQ重启如何恢复数据-->
<subscriptionRecoveryPolicy>
<!--
FixedSizedSubscriptionRecoveryPolicy: 保存⼀定size的消息,broker将为此Topic开辟定额的RAM⽤来保存最新的消息。使⽤maximumSize属性指定保存的si ze数量
FixedCountSubscriptionRecoveryPolicy: 保存⼀定条数的消息。使⽤maximumSize属性指定保存的size数量LastImageSubscriptionRecoveryPolicy: 只保留最新的⼀条数据
QueryBasedSubscriptionRecoveryPolicy: 符合置顶selector的消息都将被保存,具体能够“恢复”多少消息,由底层存储机制决定;⽐如对于⾮持久化消息,只要内存中还存在,则都可以恢复。
TimedSubscriptionRecoveryPolicy: 保留最近⼀段时间的消息。使⽤recoverDuration属性指定保存时间单位毫秒NoSubscriptionRecoveryPolicy: 关闭“恢复机制”。默认值。
-->
<!--恢复最近30分钟内的信息-->
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
</subscriptionRecoveryPolicy>
<!--
"死信"策略如何处理过去消息
缺省死信队列(Dead Letter Queue)叫做ActiveMQ.DLQ;所有的未送达消息都会被发送到这个队列,以致会⾮常难于管理。默认情况下,⽆论是Topic还是Queu e,broker将使⽤Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定为Topic。
-->
<deadLetterStrategy>
<!--
IndividualDeadLetterStrategy: 把DeadLetter放⼊各⾃的死信通道中,queuePrefix⾃定义死信前缀,useQueueForQueueMessages使⽤队列保存死信,还有⼀个属性为“useQueueForTopicMessages”,此值表⽰是否将Topic的DeadLetter保存在Queue中,默认为true。
<individualDeadLetterStrategy  queuePrefix="DLQ." useQueueForQueueMessages="true"/>
SharedDeadLetterStrategy: 将所有的DeadLetter保存在⼀个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定。还有2个很重要的可选参数,“processExpired”表⽰是否将过期消息放⼊死信队列,默认为true;“processNonPersistent”表⽰是否将“⾮持久化”消息放⼊死信队列,默认为false。
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
DiscardingDeadLetterStrategy: broker将直接抛弃DeadLeatter。如果开发者不需要关⼼DeadLetter,可以使⽤此策略。AcitveMQ提供了⼀个便捷的插件:Dis cardingDLQBrokerPlugin,来抛弃DeadLetter。下⾯这个必须配置plugins节点中才对,丢弃所有死信
<discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />
丢弃指定死信
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000"
/>
使⽤丢弃正则匹配到死信
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}"
reportInterval="3000" />
-->
<individualDeadLetterStrategy  queuePrefix="DLQ.TOPIC."useQueueForQueueMessages="true"/>
</deadLetterStrategy>
<!--⾮耐久待处理消息处理策略类似于:pendingQueuePolicy(在下⾯⾃⼰)-->
<pendingSubscriberPolicy>
<!--⽀持三种策略:storeCursor, vmCursor和fileCursor。-->
<fileCursor/>
</pendingSubscriberPolicy>
<!--耐久待处理消息处理策略类似于:pendingQueuePolicy(在下⾯⾃⼰)-->
<pendingDurableSubscriberPolicy>
<!--⽀持三种策略:storeDurableSubscriberCursor, vmDurableCursor和 fileDurableSubscriberCursor。-->
<storeDurableSubscriberCursor/>
</pendingDurableSubscriberPolicy>
</policyEntry>
<!--消息队列-->
<policyEntry queue=">"producerFlowControl="true"optimizedDispatch="true"memoryLimit="16mb">
<pendingMessageLimitStrategy>
<prefetchRatePendingMessageLimitStrategy multiplier="10"/>
</pendingMessageLimitStrategy>
<messageEvictionStrategy>
<OldestMessageWithLowestPriorityEvictionStrategy />
</messageEvictionStrategy>
<slowConsumerStrategy>
<abortSlowConsumerStrategy abortConnection="false"/>
</slowConsumerStrategy>
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="1800000"/>
</subscriptionRecoveryPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy  queuePrefix="DLQ.QUEUE."useQueueForQueueMessages="true"/>
</deadLetterStrategy>
<!--
pendingQueuePolicy 待消费消息策略
通道中有⼤量Slow Consumer时,Broker该如何优化消息的转发,以及在此情况下,“⾮持久化”消息达到内存限制时该如何处理。
当Broker接受到消息后,通常将最新的消息写⼊内存以提⾼消息转发的效率,提⾼消息ACK的效率,减少对对底层Store的操作;如果Consumer⾮常快速,那么消息将会⽴即转发给Consumer,不需要额外的操作;但当遇到Slow Consumer时,情况似乎并没有那么美好。
持久化消息,通常为:写⼊Store->线程轮询,从Store中pageIn数据到PendingStorage->转发给Consumer->从PendingStorage中移除->消息ACK后从Store中移除。
对于⾮持久化数据,通常为:写⼊内存->如果内存⾜够,则PendingStorage直接以内存中的消息转发->如果内存不⾜,则将内存中的消息swap到临时⽂件中->从临时⽂件中pageIn到内存,转发给Consumer。
AcitveMQ提供了⼏个的Cursor机制,它就是⽤来保存Pending Messages。
1) vmQueueCursor: 将待转发消息保存在额外的内存(JVM linkeList)的存储结构中。是“⾮持久化消息”的默认设置,如果Broker不⽀持Persistent,它是任何类型消息的默认设置。有OOM风险。
2) fileQueueCursor: 将消息保存到临时⽂件中。⽂件存储⽅式有broker的tempDataStore属性决定。是“持久化消息”的默认设置。
3) storeCursor: “综合”设置,对于⾮持久化消息,将采⽤vmQueueCursor存储,对于持久化消息采⽤fileQueueCursor。这是强烈推荐的策略,也是效率最好的策略。
-->
<pendingQueuePolicy>
<storeCursor>
<nonPersistent>
<fileQueueCursor/>
</nonPersistent>
</storeCursor>
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
ActiveMQ的特性之⼀是很好的⽀持JMX。通过JMX MBeans可以很⽅便的监听和控制ActiveMQ的brok
er。官⽅⽹站提供的JMX特性说明对于远程访问的配置流程坑爹,如果想使⽤jconsole对ActiveMQ进⾏监控,
⽆密码访问>
需要在borker节点设置useJmx属性为true,且managementContext节点的createConnector属性为true。通过jconsole访问地址service:jmx:rmi:///jndi/rmi://ip:1 099/jmxrmi进⾏连接,默认端⼝为1099,可以通过connectorPort属性修改连接端⼝,远程访问需要设置connectorHost属性为本机ip以供远程访问
有密码访问>
需要在borker节点设置useJmx属性为true,且managementContext节点的createConnector属性为false。然后在${actviemq.base}/conf⽬录下的jmx.access和j mx.password中添加⽤户权限和密码,最后修改${activemq.base}/bin/activemq⽂件,到下⾯的内容然后去掉注释,保存退出,重启activemq即可
# ACTIVEMQ_SUNJMX_START="-Dcom.sun.management.jmxremote.port=11099"
# ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START-Dcom.sun.management.jmxremo
te.password.file=${ACTIVEMQ_CONF}/jmx.password" # ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START-Dcom.sun.management.jmxremote.access.file=${ACTIVEMQ_CONF}/jmx.access"
# ACTIVEMQ_SUNJMX_START="$ACTIVEMQ_SUNJMX_START -Dcom.sun.management.jmxremote.ssl=false"
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--持久化存储-->
<!--持久化存储-->
<persistenceAdapter>
<!--
官⽅默认的持久化⽅案
AMQ Message Store 是 ActiveMQ5.0 缺省的持久化存储。Message commands 被保存到 transactional journal(由pending
rolling data logs 组成)。Messages 被保存到 data logs 中,同时被 reference store 进⾏索引以提⾼存取速度。
Date logs由⼀些单独的 data log ⽂件组成,缺省的⽂件⼤⼩是 32M,如果某个消息的⼤⼩超过了 data log ⽂件的⼤
⼩,那么可以修改配置以增加 data log ⽂件的⼤⼩。如果某个 data log ⽂件中所有的消息都被成功消费了,那么这个 data log ⽂件将会被标记,以便在下⼀轮的清理中被删除或者归档。
-->
<amqPersistenceAdapter directory="${activemq.base}/data"maxFileLength="32mb"/>
<!--
Kaha Persistence 是⼀个专门针对消息持久化的解决⽅案。它对典型的消息使⽤模式进⾏了优化。在 Kaha 中,数据被追加到 data logs 中。当不再需要 log⽂件中的数据的时候,log ⽂件会被丢弃。
-->
<!-- <kahaDB directory="${activemq.data}/kahadb"/>-->
<!--
⽀持的数据库有Apache Derby,Axion,DB2,HSQL,Informix,MaxDB,MySQL,Oracle,Postgresql,SQLServer,Sybase。
如果你使⽤的数据库不被⽀持,那么可以调整 StatementProvider 来保证使⽤正确的 SQL ⽅⾔(flavour of SQL)。通常绝⼤多数数据库⽀持以下 adaptor:
1、org.activemq.store.jdbc.adapter.BlobJDBCAdapter
2、org.activemq.store.jdbc.adapter.BytesJDBCAdapter
3、org.activemq.store.jdbc.adapter.DefaultJDBCAdapter
4、org.activemq.store.jdbc.adapter.ImageJDBCAdapter
也可以在配置⽂件中直接指定 JDBC adaptor
<jdbcPersistence dataSourceRef=" mysql-ds"/> 参考下⾯的的“jdbc持久化配置”
-->
</persistenceAdapter>
<!-- jdbc持久化配置
<bean id="mysql-ds" class="org.apachemons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="sql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
-->
<!--
系统内存和磁盘空间使⽤量
broker⼀直没有可使⽤空间将有可能导致消息⽣产者的send()⽅法⽆限阻塞
⼀种替代⽅式是使⽤下⾯的配置,这时send()⽅法将会失败并抛出⼀个javax.jms.ResourceAllocationException异常
<systemUsage sendFailIfNoSpace="true">
更好的解决⽅式如下,客户端会⾸先等待3000毫秒,然后再次尝试,如果此时broker依然没有⾜够的空间可⽤,才抛出异常
-->
<systemUsage sendFailIfNoSpaceAfterTimeout="3000"sendFailIfNoSpace="true">
<systemUsage>
<!-- ⾮持久化消息最⼤占⽤内存⼤⼩ -->
<memoryUsage>
<memoryUsage percentOfJvmHeap="70"/>
</memoryUsage>
<!-- 持久化消息最⼤占⽤硬盘⼤⼩ -->
<storeUsage>
<storeUsage limit="10 gb"/>
</storeUsage>
<!-- 临时消息最⼤占⽤硬盘⼤⼩ -->
<tempUsage>
<tempUsage limit="5 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--传输器配置⼀般会⼲掉我们不需要的传输协议的-->
<transportConnectors>
<!--
ActiveMQ⽀持的传输协议 /configuring-transports.html
openwire:activemq⾃定义的⼀种协议具体请阅读/openwire.html
amqp:即Advanced Message Queuing Protocol,⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协
议,是应⽤层协议的⼀个开放标准,为⾯向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语⾔等条件的限制。
stomp:STOMP,Streaming Text Orientated Message Protocol,是流⽂本定向消息协议,是⼀种为MOM(Message
Oriented Middleware,⾯向消息的中间件)设计的简单⽂本协议。
mqtt:MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的⼀个即时通讯协议,有可能成为
物联⽹的重要组成部分。该协议⽀持所有平台,⼏乎可以把所有联⽹物品和外部连接起来,被⽤来当做传感器和致动器(⽐如通过Twitter让房屋联⽹)的通信协议。
协议。
另外它还⽀持tcp、udp、xmpp等协议,这⾥就不详细说明了,想知道更多请查看/protocols.html
uri格式:scheme://ip:port?k1=v1&k2=v2 参考:/tcp-transport-reference.html
ActiveMQ⽀持的传输⽅式
ActiveMQ⽬前⽀持的transport有VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTP S Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport 等部分说明>
VM transport: 允许在VM内部通信,从⽽避免了⽹络传输的开销。这时候采⽤的连接不是 socket 连接,⽽是直接地⽅
法调⽤。第⼀个创建 VM 连接的客户会启动⼀个 embed VM broker,接下来所有使⽤相同的 broker name 的 VM 连接
都会使⽤这个 broker。当这个 broker 上所有的连接都关闭的时候,这个 broker也会⾃动关闭。
TCP transport: 允许客户端通过 TCP socket 连接到远程的 broker。
Failover Transport: 是⼀种重新连接的机制,它⼯作于其它 transport 的上层,⽤于建⽴可靠的传输。它的配置语法
允许制定任意多个复合的 URI。Failover transport 会⾃动选择其中的⼀个 URI 来尝试建⽴连接。如果没有成功,那
么会选择⼀个其它的 URI 来建⽴⼀个新的连接。
Discovery transport 是可靠的 tranport。它使⽤Discovery transport来定位⽤来连接的 URI 列表。
-->
<transportConnector name="tcp+nio"uri="tcp+nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"disc overyUri="multicast://default"/>
<transportConnector name="mqtt+nio"uri="mqtt+nio://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"di scoveryUri="multicast://default"/>
</transportConnectors>
<!-- blog.csdn/kimmking/article/details/8440150/ -->
<networkConnectors>
<networkConnector uri="multicast://default"
duplex="true"
dynamicOnly="true"
networkTTL="3"
prefetchSize="1"
decreaseNetworkConsumerPriority="true"
conduitSubscriptions="true"/>
</networkConnectors>
<shutdownHooks>
<bean xmlns="/schema/beans"class="org.apache.activemq.hooks.SpringContextHook"/>
</shutdownHooks>
<!-- 配置ActiveMQ 插件,具体可以参考Spring的bean配置 -->
<plugins>
<bean id="myPlugin"class="net.oschina.PluginClass"/>
</plugins>
</broker>
<import resource="l"/>
</beans>
优化

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。