ActiveMq⾯试题
1. 常见的消息队列有哪些?
⽬前主流的消息队列有以下ActiveMQ RabbitMQ KafKa ZeroMQ MetaMQ RocketMQ
2. ⽤ActiveMQ举例,⽤java实现点对点(P2P)的流程是怎样的
⾸先是⽣产者需要new⼀个ActiveMQConnectionFactory,⽤这个factory来创建connection,然后通过connection创建session,⽽通过session创建Queue,⽣产者和TextMessage,最后通过⽣产者send到队列中去
//创建ConnectionFactory ⽤户名密码都是null
ConnectionFactory factory =new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616");
//创建Connectin
Connection ateConnection();
//创建Session 通过Connection创建的,不开启事务
Session session =ateSession(false,Session.AUTO_ACKNOWLEDGE);
//创建Destination(Queue Topic) 由Session创建出消息队列
Queue queue =ateQueue("hello");
//创建producer⽣产者
MessageProducer producer = ateProducer(queue);
//由session创建Message
TextMessage textMessage = ateTextMessage("第⼀条MQ消息'");
//使⽤Producer发送消息
producer.send(textMessage);
producer.close();
session.close();
connection.close();
System.out.println("发送完成");
}
⽽消费者这边,前⾯不变也是factory->connection->session,session也是需要创建Queue和Consumer–>开⼀个线程或者
去receive(),Consurmer这边的代码:
ActiveMQConnectionFactory connectionFactory =new ActiveMQConnectionFactory(null, null,"tcp://localhost:61616");
// connection
Connection connection = ateConnection();
connection.start();
Session session = ateSession(false,Session.AUTO_ACKNOWLEDGE);
//Session创建消息队列 --和⽣产者那边⼀模⼀样否则会创建⼀个新的队列
Queue queue = ateQueue("hello");
MessageConsumer consumer = ateConsumer(queue);
while(true){
Message receive = ive();
if(receive!=null){
TextMessage m =(TextMessage)receive;
System.out.Text().toString());
TimeUnit.SECONDS.sleep(1);
}
}
这就完成了点对点的消息传递
3. 以ActiveMQ为例,topic的消息传递的流程是怎样的呢?
流程也是ActiveMQConnectionFactory去创建Connection,Connection创建Session,Session创建Topic,⽣产者,和MapMessage,最后由⽣产者send到消息队列中,代码如下:
ActiveMQConnectionFactory connectionFactory =new ActiveMQConnectionFactory(null, null,"tcp://localhost:61616");
Connection connection = ateConnection();
Session session = ateSession(false,Session.CLIENT_ACKNOWLEDGE);
//创建了⼀个话题
Topic topic = ateTopic("topic");
//创建⽣产者
MessageProducer producer = ateProducer(topic);
MapMessage mapMessage =ateMapMessage();
mapMessage.setString("name","zs");
mapMessage.setInt("age",22);
producer.send(mapMessage);
producer.close();
session.close();
connection.close();
System.out.println("消息发送完成");
}
消费者⼤相径庭factory->connection->session->consumer,topic以及Listener
ActiveMQConnectionFactory connectionFactory =new ActiveMQConnectionFactory(null, null,"tcp://localhost:61616");
Connection connection = ateConnection();
connection.start();
Session session = ateSession(false, Session.CLIENT_ACKNOWLEDGE);
Topic topic = ateTopic("topic");
MessageConsumer consumer = ateConsumer(topic);
consumer.setMessageListener(message ->{
if(message!=null && message instanceof MapMessage){
MapMessage m=(MapMessage)message;
try{
System.out.Int("age")+m.getString("name"));
m.acknowledge();
}catch(JMSException e){
e.printStackTrace();
}
}
});
}
4. SpringMVC和SpringBoot如何整合ActiveMQ
使⽤JmsTemplate,ActiveMQFactory,CachingConnectionFactory,以及JmsTemplate,以SpringBoot为例,说说流程
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory(){
return new ActiveMQConnectionFactory(null,null,"tcp://localhost:61616");
}
@Bean
/*Spring⽤于管理真正的ConnectionFactory的ConnectionFactory*/
public CachingConnectionFactory connectionFactory(){
CachingConnectionFactory cachingConnectionFactory =new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(this.activeMQConnectionFactory());
cachingConnectionFactory.setSessionCacheSize(30);
return cachingConnectionFactory;
}
@Bean
/*Spring提供的JMS⼯具类,它可以进⾏消息发送、接收等*/
public JmsTemplate jmsTemplate(){
JmsTemplate jmsTemplate =new JmsTemplate();
//传⼊⼯⼚
jmsTemplate.tionFactory());
//开启服务质量控制
jmsTemplate.setExplicitQosEnabled(true);
//持久化------⽣产者
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
//客户端签收消息
jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return jmsTemplate;
}
Service调⽤并发布信息
//false为默认发送Queue true发Topic
jmsTemplate.setPubSubDomain(true);
jmsTemplate.send("email", session ->{
return  JSONString(email));
});
消费者:以下部署,注意是发布者是Queue还是Topic,对应好,消费者需要做的事情:ActiveMQConnectionFactory->CacheConnectionFactory->队列/订阅->LinstenerContainer以下我⽤SpringMVC部署去写 相当于把SpringBoot和SpringMVC都写了⼀遍吧
<bean id="mqConnectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory"
userName=""
password=""
brokerURL="tcp://localhost:61616"/>
<bean id="connectionFactory"class="org.tion.CachingConnectionFactory"
targetConnectionFactory-ref="mqConnectionFactory"
sessionCacheSize="30"/>
<bean id="emailTopic"class="org.apache.activemqmand.ActiveMQTopic">
<constructor-arg name="name"value="email"/>
</bean>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<!--CachingConnectionFactory-->
activemq和rocketmq的区别<property name="connectionFactory"ref="connectionFactory"/>
<!--队列-->
<property name="destination"ref="emailTopic"/>
<!--⾃⼰写的-->
<property name="messageListener"ref="emailMsgListener"/>
<!--签收⽅式 2表⽰⼿动签收 1表⽰⾃动签收-->
<property name="sessionAcknowledgeMode"value="2"/>
</bean>
这样就配置完毕了;最后只需要⼀个配置⼀个⽤onMessage⽅法拿值就可以了
最后textMessage.acknowledge();//签收消息

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。