SpringBoot整合RabbitMQ,简易的队列实例
在这个界⾯⾥⾯我们可以做些什么?
可以⼿动创建虚拟host,创建⽤户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等。
⾸先先介绍⼀个简单的⼀个消息推送到接收的流程,提供⼀个简单的图:
黄⾊的圈圈就是我们的消息推送服务,将消息推送到中间⽅框⾥⾯也就是 rabbitMq的服务器,然后经过服务器⾥⾯的交换机、队列等各种关系(后⾯会详细讲)将数据处理⼊列后,最终右边的蓝⾊圈圈消费者获取对应监听的消息。
rabbitMq简单编码 (实例:)
⾸先创建 rabbitmq-provider,
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.mq-amqp</groupId>
<artifactId>mq-amqp-client</artifactId>
<version>1.0.5</version>
</dependency>
## 配置rabbitMQ 信息
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 开启发送确认
publisher-confirms: true
# 开启发送失败退回
publisher-returns: true
# 消息 rabbitmq 的⾃定义相关配置
rabbit:
msg:
exchange:
fanout:
name: msg_fanout_exchange
topic:
name: msg_topic_exchange
alternate:
name: msg_alternate_exchange
dead:
name: msg_dead_exchange
queue:spring framework是什么框架的
sms:
name: msg.sms.send
dead:
name: msg.sms.dead.send
upstream:
name: msg.sms.upstream
alternate:
name: msg.alternate
route:
sms: msg.sms.send
upstream: msg.sms.upstream
消息 rabbitMq 属性配置配置交换机,并绑定
/**
* @desc:消息 rabbitMq 属性配置
* @author:
* @date: 2020/6/24 15:40
* @version: 3.0.0
* @since: 3.0.0
*/
@RefreshScope
@Component
public class RabbitMqMsgProperties {
// 扇形交换机名称
@Value("${hange.fanout.name}")
private String fanoutExchangeName;
// 备份换机名称
@Value("${hange.alternate.name}")
private String alternateExchangeName;
// TOPIC换机名称
@Value("${pic.name}")
private String topicExchangeName;
// 消息死信交换机名称
@Value("${hange.dead.name}")
private String deadExchangeName;
// 备份队列名称
@Value("${rabbit.msg.queue.alternate.name}")
private String alternateQueueName;
// 短信消息队列名称
@Value("${rabbit.msg.queue.sms.name}")
private String smsQueueName;
// 短信消息死信队列名称
@Value("${rabbit.msg.queue.sms.dead.name}")
private String smsDeadQueueName;
// 邮件消息队列名称
@Value("${rabbit.ail.name}")
private String emailQueueName;
/
/ 邮件消息死信队列名称
@Value("${rabbit.ail.dead.name}")
private String emailDeadQueueName;
// 上⾏消息队列名称
@Value("${rabbit.msg.queue.upstream.name}")
private String upstreamQueueName;
// 短信消息路由键
@Value("${ute.sms}")
private String smsRouteKey;
// 邮件消息路由键
@Value("${ail}")
private String emailRouteKey;
// 上⾏消息路由键
@Value("${ute.upstream}")
private String upstreamRouteKey;
// 消息队列名称
@Value("${rabbit.msg.queue.wx.name}")
private String wxQueueName;
// 消息路由键
@Value("${ute.wx}")
private String wxRouteKey;
// 消息死信队列名称
@Value("${rabbit.msg.queue.wx.dead.name}")
private String wxDeadQueueName;
public String getFanoutExchangeName() {
return fanoutExchangeName;
}
public void setFanoutExchangeName(String fanoutExchangeName) { this.fanoutExchangeName = fanoutExchangeName;
}
public String getSmsQueueName() {
return smsQueueName;
}
public void setSmsQueueName(String smsQueueName) {
this.smsQueueName = smsQueueName;
}
public String getEmailQueueName() {
return emailQueueName;
}
public void setEmailQueueName(String emailQueueName) {
}
public String getUpstreamQueueName() {
return upstreamQueueName;
}
public void setUpstreamQueueName(String upstreamQueueName) {
this.upstreamQueueName = upstreamQueueName;
}
public String getTopicExchangeName() {
return topicExchangeName;
}
public void setTopicExchangeName(String topicExchangeName) {
}
public String getSmsRouteKey() {
return smsRouteKey;
}
public void setSmsRouteKey(String smsRouteKey) {
this.smsRouteKey = smsRouteKey;
}
public String getEmailRouteKey() {
return emailRouteKey;
}
public void setEmailRouteKey(String emailRouteKey) {
}
public String getUpstreamRouteKey() {
return upstreamRouteKey;
}
public void setUpstreamRouteKey(String upstreamRouteKey) {
this.upstreamRouteKey = upstreamRouteKey;
}
public String getAlternateExchangeName() {
return alternateExchangeName;
}
public void setAlternateExchangeName(String alternateExchangeName) {
this.alternateExchangeName = alternateExchangeName;
}
public String getAlternateQueueName() {
return alternateQueueName;
}
public void setAlternateQueueName(String alternateQueueName) {
this.alternateQueueName = alternateQueueName;
}
public String getSmsDeadQueueName() {
return smsDeadQueueName;
}
public void setSmsDeadQueueName(String smsDeadQueueName) {
this.smsDeadQueueName = smsDeadQueueName;
}
public String getEmailDeadQueueName() {
return emailDeadQueueName;
}
public void setEmailDeadQueueName(String emailDeadQueueName) {
}
public String getDeadExchangeName() {
return deadExchangeName;
}
public void setDeadExchangeName(String deadExchangeName) {
this.deadExchangeName = deadExchangeName;
}
public String getWxQueueName() {
return wxQueueName;
}
public void setWxQueueName(String wxQueueName) {
this.wxQueueName = wxQueueName;
}
public String getWxRouteKey() {
return wxRouteKey;
}
public void setWxRouteKey(String wxRouteKey) {
this.wxRouteKey = wxRouteKey;
}
public String getWxDeadQueueName() {
return wxDeadQueueName;
}
public void setWxDeadQueueName(String wxDeadQueueName) {
this.wxDeadQueueName = wxDeadQueueName;
}
}
bps.f;
import org.*;
import org.springframework.tion.CachingConnectionFactory; import org.springframework.tion.ConnectionFactory;
import org.springframework.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.fig.ConfigurableBeanFactory;
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration; import org.springframework.dition.ConditionalOnBean; import org.springframework.dition.ConditionalOnMissingBean; import t.annotation.Bean;
import t.annotation.Configuration;
import t.annotation.Scope;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* @desc:消息 rabbitmq 配置类
* @author: guanliang.xue
* @date: 2020/6/24 15:07
* @version: 3.0.0
* @since: 3.0.0
*/
@Configuration
@ConditionalOnBean(value = RabbitMqMsgProperties.class)
public class RabbitMqMsgConfig {
@Resource
private RabbitMqMsgProperties rabbitMqMsgProperties;
/**
* 定义备份交换机
* @return 备份交换机
*/
@Bean
public FanoutExchange alternateExchange(){
return new AlternateExchangeName());
}
/
**
* 定义扇形交换机
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchangeName());
}
/**
* 定义TOPIC交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
Map<String, Object> arguments = new HashMap<>();
return new TopicExchangeName(),true,false,arguments); }
/**
* 死信交换机
* @return
*/
@Bean
public DirectExchange deadExchange(){
return new DeadExchangeName(),true,false);
}
/**
* 备份队列
* @return 队列
*/
@Bean
public Queue alternateQueue(){
return new AlternateQueueName());
}
/**
* 短信队列
* @return
*/
@Bean
public Queue smsQueue(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机
// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由key
args.put("x-dead-letter-exchange", DeadExchangeName());
return QueueBuilder.SmsQueueName())
.withArguments(args)
.
build();
}
/**
* 队列
* @return
*/
@Bean
public Queue wxQueue(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机
// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由key
args.put("x-dead-letter-exchange", DeadExchangeName());
return QueueBuilder.WxQueueName())
.withArguments(args)
.build();
}
/**
* 邮件队列
* @return
*/
@Bean
public Queue emailQueue(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这⾥声明当前队列绑定的死信交换机
// x-dead-letter-routing-key 死信路由key,默认使⽤原有路由key
args.put("x-dead-letter-exchange", DeadExchangeName());
return QueueBuilder.EmailQueueName())
.withArguments(args)
.build();
}
/**
* sms 死信队列
* @return
*/
@Bean
public Queue smsDeadQueue(){
return new SmsDeadQueueName());
}
/**
* email 死信队列
* @return
*/
@Bean
public Queue emailDeadQueue(){
return new EmailDeadQueueName());
}
/**
* wx 死信队列
* @return
*/
@Bean
public Queue wxDeadQueue(){
return new WxDeadQueueName());
}
/**
* 服务商上⾏队列
* @return
*/
@Bean
public Queue upstreamQueue(){
return new UpstreamQueueName());
}
/**
* 绑定sms死信列到死信交换机
* @param queue 死信队列
* @return
*/
@Bean
public Binding bindSmsDead(@Qualifier("smsDeadQueue") Queue queue,
@Qualifier("deadExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).SmsRouteKey());
}
/**
* 绑定email死信列到死信交换机
* @param queue 死信队列
* @return
*/
@Bean
public Binding bindEmailDead(@Qualifier("emailDeadQueue") Queue queue,
@Qualifier("deadExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).EmailRouteKey());
}
/**
* 绑定wx死信列到死信交换机
* @param queue 死信队列
* @return
*/
@Bean
public Binding bindWxDead(@Qualifier("wxDeadQueue") Queue queue,
@Qualifier("deadExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).WxRouteKey());
}
/**
* 绑定备份列到备份交换机
* @param queue 备份队列
* @return
*/
@Bean
public Binding bindAlternate(@Qualifier("alternateQueue") Queue queue,
@Qualifier("alternateExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
/**
* 绑定短信队列到TOPIC交换机
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding bindSms(@Qualifier("smsQueue") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).SmsRouteKey());
}
/**
* 绑定邮件队列
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding bindEmail(@Qualifier("emailQueue") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).EmailRouteKey());
}
/**
* 绑定队列
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding bindWx(@Qualifier("wxQueue") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).WxRouteKey());
}
/**
* 绑定上⾏消息队列
* @param queue
* @param topicExchange
* @return
*/
@Bean
public Binding bindUpstream(@Qualifier("upstreamQueue") Queue queue,
@Qualifier("topicExchange") TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).UpstreamRouteKey());
}
@Bean
@ConditionalOnMissingBean(AliyunAmqpConfig.class)
@ConditionalOnBean(RabbitAutoConfiguration.class)
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setConfirmCallback(new RabbitMqMsgConfirmCallback());
template.setReturnCallback(new RabbitMqMsgReturnCallback());
return template;
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论