Spring+ActiveMQ配置
一、Broker配置
1、broker.properties文件内容:
2、l文件内容:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="/schema/beans"
xmlns:amq="/schema/core" xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/schema/beans /schema/beans/spring-beans-2.5.xsd">
<bean
class="org.springframework.fig.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:com/demo/broker.properties</value>
</list>
</property>
</bean>
<bean id="tcpConnector" class="org.apache.activemq.broker.TransportConnector">
<property name="uri" value="${p.local.url}"></property>
</bean>
<bean id="tcpQueue" class="org.apache.activemqmand.ActiveMQQueue">
<property name="physicalName" value="${p.local.queue}"></property>
</bean>
<bean id="kahaPersistenceAdapter"
class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
<property name="persistentIndex" value="true"></property>
<property name="maxDataFileLength" value="1048576"></property>
</bean>
<bean id="kahaDBPersistenceAdapter"
class="org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter">
<property name="checkForCorruptJournalFiles" value="true"></property>
<property name="checkpointInterval" value="5000"></property>
<property name="checksumJournalFiles" value="true"></property>
<property name="cleanupInterval" value="30000"></property>
<property name="directory" value="activemq-data"></property>
<property name="enableIndexWriteAsync" value="true"></property>
<property name="enableJournalDiskSyncs" value="true"></property>
<property name="ignoreMissingJournalfiles" value="false"></property>
<property name="indexCacheSize" value="100"></property>
<property name="journalMaxFileLength" value="1048576"></property>
<property name="journalMaxWriteBatchSize" value="1000"></property>
</bean>
<bean id="brokerService" class="org.apache.activemq.broker.BrokerService"
init-method="start" destroy-method="stop" scope="singleton">
<property name="brokerName" value="${p.local.name}"></property>
<property name="useJmx" value="false"></property>
<property name="persistenceAdapter" ref="kahaPersistenceAdapter"></property>
<property name="transportConnectors">
<list>
<ref local="tcpConnector" />
</list>
</property>
<property name="destinations">
<set>
<ref local="tcpQueue" />
</set>
</property>
<property name="plugins">
<set>
<ref local="loggingBrokerPlugin" />
<ref local="destinationDotFilePlugin" />
<ref local="
statisticsBrokerPlugin" />
</set>
</property>
</bean>
<bean id="loggingBrokerPlugin" class="org.apache.activemq.broker.util.LoggingBrokerPlugin">
<property name="logAll" value="true"></property>
</bean>
<bean id="destinationDotFilePlugin"
class="org.apache.activemq.broker.view.DestinationDotFilePlugin">
<property name="file" value=""></property>
</bean>
<bean id="statisticsBrokerPlugin" class="org.apache.activemq.plugin.StatisticsBrokerPlugin" />
</beans>
二、消息生产者配置
1、l文件内容:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="/schema/beans"
xmlns:amq="/schema/core" xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/schema/beans /schema/beans/spring-beans-2.5.xsd">
<bean
class="org.springframework.fig.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:com/demo/broker.properties</value>
</list>
</property>
</bean>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${p.local.url}"></property>
</bean>
<!-- 采用TCP长连接方式, 避免每次建立短连接需要的额外工作时间 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<constructor-arg ref="connectionFactory"></constructor-arg>
</bean>
<bean id="tcpQueue" class="org.apache.activemqmand.ActiveMQQueue">
<property name="physicalName" value="${p.local.queue}"></property>
</bean>
<bean id="jmsTemplate" class="org.JmsTemplate">
<property name="connectionFactory">
<bean class="org.tion.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>
</property>
<property name="messageConverter" ref="messageConverter" />
<property name="sessionTransacted" value="true"></property>
</bean>
<!-- 消息转换 -->
<bean id="messageConverter" class="com.demo.client.MyMessageConverter" />
<!-- 消息生产 -->
<bean id="messageProducer" class="com.demo.client.MyMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="tcpQueue" />
</bean>
</beans>
2、消息转换类:
public class MyMessageConverter implements MessageConverter {
private static Logger log = Logger(MyMessageConverter.class);
@SuppressWarnings("unchecked")
public Object fromMessage(Message msg) throws JMSException,
MessageConversionException {
if (msg instanceof ObjectMessage) {
HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg)
.getObjectProperty("Map");
try {
ByteArrayInputStream bis = new ByteArrayInputStream(map
.get("MSG_ID"));
ObjectInputStream ois = new ObjectInputStream(bis);
Object o = adObject();
ois.close();
bis.close();
return o;
} catch (IOException e) {
<("failed to read object message: " + e.getMessage());
} catch (ClassNotFoundException e) {
<("failed to read object message: " + e.getMessage());
}
} else {
throw new JMSException("Message: [" + msg + "] is not a Map !");
}
return null;
}
public Message toMessage(Object obj, Session session) throws JMSException,
MessageConversionException {
if (obj instanceof MyMessage) {
ActiveMQObjectMessage o = (ActiveMQObjectMessage) session
.createObjectMessage();
Map<String, byte[]> map = new HashMap<String, byte[]>();
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(obj);
map.put("MSG_ID", ByteArray());
oos.close();
bos.close();
} catch (IOException e) {
<("failed to write object message: " + e.getMessage());
}
o.setObjectProperty("Map", map);
return o;
} else {
throw new JMSException("Object: [" + obj + "] is not a Message !");
}
}
}
3、消息生产者类:
public class MyMessageProducer {
private static Logger log = Logger(MyMessageProducer.class);
private JmsTemplate template;
private Queue destination;
public void setTemplate(JmsTemplate template) {
}
public void setDestination(Queue destination) {
this.destination = destination;
}
public void send(GrccMessage message) {
spring framework
log.info("生产消息 ==>\n" + message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
三、消息消费者配置:
1、l文件内容:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="/schema/beans"
xmlns:amq="/schema/core" xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/schema/beans /schema/beans/spring-beans-2.5.xsd">
<bean
class="org.springframework.fig.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:com/demo/broker.properties</value>
</list>
</property>
</bean>
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${p.local.url}"></property>
</bean>
<!-- 采用TCP长连接方式, 避免每次建立短连接需要的额外工作时间 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<constructor-arg ref="connectionFactory"
></constructor-arg>
</bean>
<bean id="tcpQueue" class="org.apache.activemqmand.ActiveMQQueue">
<property name="physicalName" value="${p.local.queue}"></property>
</bean>
<bean id="jmsTemplate" class="org.JmsTemplate">
<property name="connectionFactory">
<bean class="org.tion.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory" />
</bean>
</property>
<property name="messageConverter" ref="messageConverter" />
<property name="sessionTransacted" value="true"></property>
</bean>
<bean id="messageConverter" class="com.demo.client.MyMessageConverter" />
<bean id="messageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="com.demo.client.MyMessageConsumer">
</bean>
</constructor-arg>
<property name="defaultListenerMethod" value="consume" />
<property name="messageConverter" ref="messageConverter" />
</bean>
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="tcpQueue" />
<property name="messageListener" ref="messageListener" />
</bean>
</beans>
2、消息消费者类:
public class MyMessageConsumer {
private static Logger log = Logger(MyMessageConsumer.class);
public void consume(MyMessage message) {
log.info("消费消息 <==\n" + message);
//  try {
//  Thread.sleep(1000);
//  } catch (InterruptedException e) {
//  }
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。