(转)关于ActiveMQ的配置
⽬前常⽤的消息队列组建⽆⾮就是MSMQ和ActiveMQ,⾄于他们的异同,这⾥不想做过多的⽐较。简单来说,MSMQ内置于微软操作系统之中,在部署上包含⼀个隐性条件:Server需要是微软操作系统。(对于这点我并去调研过MSMQ是否可以部署在⾮微软系统,⽐
如:Linux,只是拍脑袋想了想,感觉上是不可以)。对于ActiveMQ,微软系统和Linux都是可以部署的。从功能⽅⾯来说,⼀般最常⽤的就是:消息的收/发,感觉差异不⼤。从性能上来说,⼀般的说法是ActiveMQ略⾼。在稳定性上,个⼈感觉MSMQ更好。如果这两种常⽤队列都⽤过的同学,应该来说最⼤的差异在于:MSMQ如果要访问远程队列(⽐如机器A上的程序访问机器B上的队列),会⽐较恶⼼。在数据量⽐较⼤的情况之下,⼀般来说队列服务器会专门的⼀台或者多台(多台的话,⽤程序去做热备+负载⽐较⽅便,也不需要额外的硬件成本。简单来说做法可以这样:消息发送的时候随机向着多台队列服务器上发送消息;接受的时候开多个线程去分别监听;热备⽅⾯,可以维护⼀个带状态的队列连接池,如果消息收发失败那么将状态置为不可⽤,然后起⼀个线程去定时监测坏的连接是否可⽤,这个过程⼀般情况下可以不⽤加锁,为什么,⼤家根据各⾃需要去取舍吧)。最近搞完了短彩信的⽹关连接服务,这两种队列我均使⽤了。⼤致的过程是这样的:上层应⽤如果要发端彩信,那么将消息直接发送⾄ActiveMQ(⽬前⽤的就是上⾯说的多台热备+负载,因为实际中下⾏量⾮常⼤5千万条/天以上),然后端彩信⽹关连接服务部署多套,每套均依赖本机的MSMQ。为什么呢?⽤ActiveMQ的原因是:上层应⽤程序和⽹关连接服
务彼此独⽴,消息需要跨机访问。⽤MSMQ的原因是:ActiveMQ中的数据是⼀条不分省的⼤队列,⽹关连接服务需要按省流控,所以端彩信⽹关连接服务:⾸先把消息从ActiveMQ取出来,然后存⾄本机上的分省MSMQ,这样做另外的⼀个好处就是:ActiveMQ不⾄于过多挤压,他的数据会分摊到N台短彩信⽹关连接服务所部署的机器上的MSMQ之中,也就说MSMQ可以起到分摊数据和缓冲的作⽤。
在之前的随笔中,已经介绍过MSMQ,现在先介绍⼀下ActiveMQ⼀些配置,⽬前好像ActiveMQ配置上的介绍还⽐较少。以下是⾃⼰总结⼀些相关资料,贴出来给⼤家共享
⼀)问题分析和解决
1)KahaDb和AMQ Message Store两种持久⽅式如何选择?
官⽅:
From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store. The AMQ Message Store which although faster than Kah
aDB - does not scales as well as KahaDB and recovery times take longer.
⾮官⽅:
kaha⽂件系统实际上上是⼀个⽂件索引系统,有两部分组成,⼀个是数据⽂件系统,由⼀个个独⽴的⽂件组成,缺省⽂件⼤⼩是32M⼤(可配置),另外⼀个是索引⽂件系统,记录消息在数据⽂件中的位置信息以及数据⽂件中的空闲块信息。数据⽂件是存储到硬盘上的,索引⽂件是缓存在内存中的。所以这个存储系统对⼤消息存储有利,象我们的memberId之类的⽂本消息,实际上是浪费,索引⽐消息还⼤,哈。我⽅分析:
推荐: Amq持久⽅式
理由:虽然官⽅推荐使⽤KahaDB持久⽅式,但其提到的优势:可伸缩性和恢复性较好,对于我们实际的应⽤意义不⼤。从我们⾃⼰的使⽤经验来看,KahaDB持久⽅式,Data⽂件是⼀个⼤⽂件(感觉⽂件过⼤后,造成队列服务瘫死的可能性会增⼤),从官⽹的相关配置(附录1)也不到哪⾥可以设置数据的⽂件的最⼤Size。)⽽Amq持久⽅式可以设置Data⽂件最⼤Size,这样可以保证即时消息积压很多,Data ⽂件也不⾄于过⼤。
2)错误:
解决⽅法:
在建⽴连接的Uri中加⼊: wireFormat.maxInactivityDuration=0
参考资源:
You can do the following to fix the issues:
1) Append max inactivity duration to your Uri in the format below: wireFormat.maxInactivityDuration=0
2) Use the same Uri at the client side as well as at the server side
Regards,
如果不这样设置,对应的错误会出现:
2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener:
org.ansport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
org.ansport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
ActiveMQ的tcp url:tcp://localhost:61616后⾯要加⼊?wireFormat.maxInactivityDuration=0 这样的参数,否则当⼀段时间没有消息发送时会抛出 "Channel was inactive for too long"异常
3)错误:
解决⽅法:
1)关闭ActiveMqLog4j
打开:conf/log4j.propertiesactive transport
将:Logger=INFO, console, logfile
修改为:Logger=OFF
2)在建⽴连接的Uri中加⼊: maxInactivityDurationInitalDelay=30000
例如北京的测试环境连接Uri:
tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0&maxInactivityDurationInitalDelay=30000&connection.AsyncSend=true 参考资源:
If you get exception like this,it can mean one of three things:
1. You're connecting to the port not used by ActiveMQ TCP transport
Make sure to check that you're connecting to the appropriate host:port
2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages
Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender
3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time
If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat.maxInactivityDurationInitalDelay property on the connection URL in your client.
tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000
will use 30 sec timeout.(貌似有问题)
4)错误:
解决⽅法:
1)设置Java最⼤内存限制为合适⼤⼩:
Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默认是512)
2)l配置节:systemUsage/ systemUsage配置⼤⼩合适,并且特别注意:⼤于所有durable desitination设置的memoryUsage 之和。
备注:
1)尖括号:“>”代表通配符
2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=所有durable desitination设置之和
3)SystemUsage配置设置了⼀些系统内存和硬盘容量,当系统消耗超过这些容量设置时,amq会“slow down producer”,还是很重要的。
参考资料:
对于MQ的内容实⽤是可管理和可配置的。⾸先需要判断的是MQ的哪部分系统因内存不⾜⽽导致泄漏,是JVM,broker还是消费者、⽣产者?
⼀、内存管理
JVM内存管理:
1. ⽤bin/activemq命令在独⽴JVM中运⾏broker。⽤-Xmx和-Xss命令即可(activemq.bat⽂件中修改ACTIVEMQ_OPTS选项参数即可);
2. 默认情况下,MQ⽤512M的JVM;
broker内存管理:
1. broker使⽤的内存并不是由JVM的内存决定的。虽然受到JVM的限制,但broker确实独⽴管理器内存;
2. systemUsage和destination的内存限制与broker内存息息相关;
3. MQ中内存的关系是:JVM->Broker->broker features;
4. 所有destination的内存总量不能超过broker的总内存;
消费者:
1. 由于消息⼤⼩可以配置,prefetch limit往往是导致内存溢出的主要原因;
2. 减少prefetch limit的⼤⼩,会减少消费者内存中存储的消息数量;
⽣产者:
1. 除⾮消息数量超过了broker资源的限制,否则⽣产者不会导致内存溢出;
2. 当内存溢出后,⽣产者会收到broker的阻塞信息提⽰;
将消息缓冲之硬盘:
1. 只有当消息在内存中存储时,才允许消息的快速匹配与分发,⽽当消费者很慢或者离开时,内存可能会耗尽;
2. 当destination到达它的内存临界值时,broker会⽤消息游标来缓存⾮持久化的消息到硬盘。
3. 临界值在broker中通过memoryUsage和systemUsage两个属性配置,请参考l;
4. 对于缓慢的消费者,当尚未耗尽内存或者转变为⽣产者并发控制模式前,这个特性允许⽣产者继续发送消息到broker;
5. 当有多个destination的时候,默认的内存临界值可能被打破,⽽这种情况将消息缓存到硬盘就显得很
有意义;
6. precentUsage配置:使⽤百分⽐来控制内存使⽤情况;
多个线程:
1. 默认情况下,MQ每个destination都对应唯⼀的线程;
2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat⽂件中修改ACTIVEMQ_OPTS选项参数即可),⽤线程池来限制线程的数量,从⽽减少内存消耗;
⼤数据传输:
1. destination policies--maxPageSize:控制进⼊内存中的消息数量;lazyDispatch:增加控制使⽤当前消费者列表的预取值;
2. 使⽤blogMessage或者streamsMessage类型来进⾏⼤量⽂件的传输;
泄漏JMS资源:
1. 当session或者producer或者consumer⼤量存在⽽没有关闭的时候;
2. 使⽤PooledConnectionFactory;
5)采⽤failover⽅式连接导致卡死
解决⽅法:
不采⽤failover连接
分析:
采⽤failover⽅式连接,如果所要连接的服务器或者Activemq服务宕了,那么程序会⼀直处于等待状态,不超时,不报错。
⼆)附录
1)KahaDB
property name default
value
Comments
directory activemq-
data
the path to the directory to use to store the message store data and log files
indexWriteBatchSize1000number of indexes written in a batch indexCacheSize10000number of index pages cached in memory enableIndexWriteAsync false if set, will asynchronously write indexes journalMaxFileLength32mb a hint to set the maximum size of the message data logs
enableJournalDiskSyncs true ensure every non transactional journal write is followed by a disk sync (JMS durability requirement)
cleanupInterval30000time (ms) before checking for a discarding/moving message data logs that are no longer used
checkpointInterval5000time (ms) before checkpointing the journal
ignoreMissingJournalfiles false If enabled, will ignore a missing message log file checkForCorruptJournalFiles false If enabled, will check for corrupted Journal files on startup and try and recover them checksumJournalFiles false create a checksum for a journal file - to enable checking for corrupted journals Available since version 5.4:
archiveDataLogs false If enabled, will move a message data log to the archive directory instead of deleting it.
Define the directory to move data logs to when they all the messages they contain have
directoryArchive null Define the directory to move data logs to when they all the messages they contain have been consumed.
databaseLockedWaitDelay10000time (ms) before trying to get acquire a the database lock (used by shared master/slave)
maxAsyncJobs10000the maximum number of asynchronous messages that will be queued awaiting storage (should be the same as the number of concurrent MessageProducers)
concurrentStoreAndDispatchTopics false enable the dispatching of Topic messages to interested clients to happen concurrently with message storage
concurrentStoreAndDispatchQueuestrue enable the dispatching of Queue messages to interested clients to happen concurrently with message storage
2)AMQ
持久配置
property name default
value
Comments
directory activemq-
data
the path to the directory to use to store the message store data and log files
useNIO true use NIO to write messages to the data logs syncOnWrite false sync every write to disk
maxFileLength32mb a hint to set the maximum size of the message data logs
persistentIndex true use a persistent index for the message logs. If this is false, an in-memory structure is maintained
maxCheckpointMessageAddSize4kb the maximum number of messages to keep in a transaction before automatically committing cleanupInterval30000time (ms) before checking for a discarding/moving message data logs that are no longer used
indexBinSize1024default number of bins used by the index. The bigger the bin size - the better the relative performance of the index
indexKeySize96the size of the index key - the key is the message id
indexPageSize16kb the size of the index page - the bigger the page - the better the write performance of the index directoryArchive archive the path to the directory to use to store discarded data logs
archiveDataLogs false if true data logs are moved to the archive directory instead of being deleted
3)systemUsage
配置
property namedefault value Comments
memoryUsage20M amq使⽤内存⼤⼩,照amq论坛上说,这个值应该⼤于所有durable desitination设置的
memoryUsage之和,否则会导致硬盘swap,影响性能。
storeUsage1G kaha数据存储⼤⼩,如果设置不⾜,性能会下降到1个1个发
tempUsage100M⾮persistent的消息存储在temp区域
4
)其他配置
4.1
)
Option Name Default Description
transport.timeout-1Time that a send operation blocks before failing.
transport.initialReconnectDelay10Time in Milliseconds that the transport waits before attempting to reconnect the first time. transport.maxReconnectDelay30000The max time in Milliseconds that the transport will wait before attempting to reconnect.
transport.backOffMultiplier2The amount by which the reconnect delay will be multiplied by if useExpon
entialBackOff is enabled.
transport.useExponentialBackOff true Should the delay between connection attempt grow on each try up to the max reconnect delay.
transport.randomize true Should the Uri to connect to be chosen at random from the list of available Uris.
transport.maxReconnectAttempts0Maximum number of time the transport will attempt to reconnect before failing (0 means infinite retries)
transport.startupMaxReconnectAttempts0Maximum number of time the transport will attempt to reconnect before failing when there has never been a connection made. (0 means infinite retries) (included in NMS.ActiveMQ
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论