springboot整合RabbitMQyml配置⽂件配置交换机队列信息1.配置⽂件
spring怎么读多个文件>>>>>>>>>>##
>##  Rabbit MQ Exchange Queue Config  >>
>>>>>>>>>>##
rabbit:
# 交换机
exchanges:
#    # ⾃定义-延迟
#    - name: de
#      type: CUSTOM
#      custom-type: x-delayed-message
#      arguments:
#        x-delayed-type: direct
#    # 通配符订阅
#    - name: de
#      type: TOPIC
#    # ⼴播
#    - name: de
#      type: FANOUT
#    # 消息头
#    - name: de
#      type: HEADERS
# 直连交换机
- name: centerDeliverExchange
type: DIRECT
# 队列
queues:
# 直连队列
- name: queue-PLUS2-9002
routing-key: route-PLUS2-9002
exchange-name: centerDeliverExchange
- name: queue-PLUS2-9003
routing-key: route-PLUS2-9003
exchange-name: centerDeliverExchange
#    # 队列2
#    - name: queue.2
#      routing-key: queue.*
#      exchange-name: de
#    # 延迟队列
#    - name: delay.queue
#      routing-key: delay.queue
#      exchange-name: de
将以上配置⽂件引⼊l
spring:
profiles:
include: rabbit
注⼊配置⽂件并定义交换机队列
SpringBeanUtils.java
batis.plus.utils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.fig.BeanFactoryPostProcessor;
import org.springframework.fig.ConfigurableListableBeanFactory;
import org.springframework.stereotype.Component;
/
**
* spring⼯具类⽅便在⾮spring管理环境中获取bean
*
* @author gch
*/
@Component
public final class SpringBeanUtils implements BeanFactoryPostProcessor {
/**
* Spring应⽤上下⽂环境
*/
private static ConfigurableListableBeanFactory beanFactory;
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
SpringBeanUtils.beanFactory = beanFactory;
}
/**
* 获取对象
*
* @param name
* @return Object ⼀个以所给名字注册的bean的实例
* @throws BeansException
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) throws BeansException {
return (T) Bean(name);
}
/**
* 获取类型为requiredType的对象
*
* @param clz
* @return
* @throws BeansException
*/
public static <T> T getBean(Class<T> clz) throws BeansException {
T result = (T) Bean(clz);
return result;
}
public static <T> T getBean(String name, Class<T> clz) throws BeansException {
T result = (T) Bean(name, clz);
return result;
}
/**
* 如果BeanFactory包含⼀个与所给名称匹配的bean定义,则返回true
* 如果BeanFactory包含⼀个与所给名称匹配的bean定义,则返回true
*
* @param name
* @return boolean
*/
public static boolean containsBean(String name) {
ainsBean(name);
}
/**
* 判断以给定名字注册的bean定义是⼀个singleton还是⼀个prototype。如果与给定名字相应的bean定义没有被到,将会抛出⼀个异常(NoSuchBeanDefinitionException)    *
* @param name
* @return boolean
* @throws NoSuchBeanDefinitionException
*/
public static boolean isSingleton(String name) throws NoSuchBeanDefinitionException {
return beanFactory.isSingleton(name);
}
/**
* @param name
* @return Class 注册对象的类型
* @throws NoSuchBeanDefinitionException
*/
public static Class<?> getType(String name) throws NoSuchBeanDefinitionException {
Type(name);
}
/**
* 如果给定的bean名字在bean定义中有别名,则返回这些别名
*
* @param name
* @return
* @throws NoSuchBeanDefinitionException
*/
public static String[] getAliases(String name) throws NoSuchBeanDefinitionException {
Aliases(name);
}
/**
* 将bean对象注册到bean⼯⼚
*
* @param beanName
* @param bean
* @param <T>
* @return
*/
public static <T> boolean registerBean(String beanName, T bean) {
// 将bean对象注册到bean⼯⼚
return true;
}
}
RabbitMqProperties.java
fig.mq;
import lkit.StringPool;
batis.plus.utils.SpringBeanUtils;
import lombok.Data;
slf4j.Slf4j;
import llections.CollectionUtils;
import org.*;
import org.t.properties.ConfigurationProperties;
import t.annotation.Bean;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* rabbitmq 消息队列和交换机配置⽂件
*
* @author gch
*/
@Slf4j
@Data
@ConfigurationProperties(
prefix = "rabbit"
)
public class RabbitMqProperties {
/**
* 装载⾃定义配置交换机
*/
private List<ExchangeConfig> exchanges = new ArrayList<>();
/**
* 装载⾃定义配置队列
*/
private List<QueueConfig> queues = new ArrayList<>();
@Data
public static class QueueConfig {
/**
* 队列名(每个队列的名称应该唯⼀)
* 必须*
*/
private String name;
/**
* 指定绑定交互机,可绑定多个(逗号分隔)
* 必须*
*/
private String exchangeName;
/**
/**
* 队列路由键(队列绑定交换机的匹配键,例如:“user” 将会匹配到指定路由器下路由键为:【*.user、#.user】的队列)        */
private String routingKey;
/**
* 是否为持久队列(该队列将在服务器重启后保留下来)
*/
private Boolean durable = Boolean.TRUE;
/**
* 是否为排它队列
*/
private Boolean exclusive = Boolean.FALSE;
/**
* 如果队列为空是否删除(如果服务器在不使⽤队列时是否删除队列)
*/
private Boolean autoDelete = Boolean.FALSE;
/
**
* 头队列是否全部匹配
* 默认:是
*/
private Boolean whereAll = Boolean.TRUE;
/**
* 参数
*/
private Map<String, Object> args;
/**
* 消息头
*/
private Map<String, Object> headers;
}
@Data
public static class ExchangeConfig {
/**
* 交换机名
*/
private String name;
/**
* 交换机类型
*/
private ExchangeType type;
/**
* ⾃定义交换机类型
*/
private String customType;
/**
* 交换机参数(⾃定义交换机)
*/
private Map<String, Object> arguments;
}
public enum ExchangeType {
/**
* ⾃定义交换机
*/
CUSTOM,
/**
* 直连交换机(全⽂匹配)
*/
DIRECT,
/**
* 通配符交换机(两种通配符:*只能匹配⼀个单词,#可以匹配零个或多个)
*/
TOPIC,
/**
* 头交换机(⾃定义键值对匹配,根据发送消息内容中的headers属性进⾏匹配)
*/
HEADERS,
/**
* 扇形(⼴播)交换机(将消息转发到所有与该交互机绑定的队列上)
*/
FANOUT;
}
public ExchangeConfig getExchangeConfig(String name) {
Map<String, ExchangeConfig> collect = exchanges.stream().Map(e -> e.getName(), e -> e));
(name);
}
/**
* 动态创建交换机
*
* @return
*/
@Bean
public Object createExchange() {
List<ExchangeConfig> exchanges = getExchanges();
if (!CollectionUtils.isEmpty(exchanges)) {
exchanges.forEach(e -> {
// 声明交换机
Exchange exchange = null;
switch (e.getType()) {
case DIRECT:
exchange = new Name());
break;
case TOPIC:
exchange = new Name());
exchange = new Name());
break;
case HEADERS:
exchange = new Name());
break;
case FANOUT:
exchange = new Name());
break;
case CUSTOM:
exchange = new Name(), e.getCustomType(), true, false, e.getArguments());
break;
default:
break;
}
// 将交换机注册到spring bean⼯⼚让spring实现交换机的管理
if (exchange != null) {
}
});
}
return null;
}
/
**
* 动态绑定队列和交换机
*
* @return
*/
@Bean
public Object bindingQueueToExchange() {
List<QueueConfig> queues = getQueues();
if (!CollectionUtils.isEmpty(queues)) {
queues.forEach(q -> {
// 创建队列
Queue queue = new Name(), q.getDurable(),
// 注⼊队列bean
// 获取队列绑定交换机名
List<String> exchangeNameList;
if (q.getExchangeName().indexOf(StringPool.COMMA) != -1) {
String[] split = q.getExchangeName().split(StringPool.COMMA);
exchangeNameList = Arrays.asList(split);
} else {
exchangeNameList = Arrays.ExchangeName());
}
exchangeNameList.forEach(name -> {
// 获取交换机配置参数
ExchangeConfig exchangeConfig = getExchangeConfig(name);
Binding binding = bindingBuilder(queue, q, exchangeConfig);
// 将绑定关系注册到spring bean⼯⼚让spring实现绑定关系的管理
if (binding != null) {
log.debug("queue [{}] binding exchange [{}] success!", q.getName(), Name());
}
});
});
}
return null;
}
public Binding bindingBuilder(Queue queue, QueueConfig q, ExchangeConfig exchangeConfig) {
// 声明绑定关系
Binding binding = null;
// 根据不同的交换机模式获取不同的交换机对象(注意:刚才注册时使⽤的是⽗类Exchange,这⾥获取的时候将类型获取成相应的⼦类)⽣成不同的绑定规则switch (Type()) {
case TOPIC:
binding = BindingBuilder.bind(queue)
.Name(), TopicExchange.class))
.RoutingKey());
break;
case DIRECT:
binding = BindingBuilder.bind(queue)
.Name(), DirectExchange.class))
.RoutingKey());
break;
case HEADERS:
if (q.getWhereAll()) {
binding = BindingBuilder.bind(queue)
.Name(), HeadersExchange.class))
.
Headers()).match();
} else {
binding = BindingBuilder.bind(queue)
.Name(), HeadersExchange.class))
.Headers()).match();
}
break;
case FANOUT:
binding = BindingBuilder.bind(queue)
.Name(), FanoutExchange.class));
break;
case CUSTOM:
binding = BindingBuilder.bind(queue)
.Name(), CustomExchange.class))
.RoutingKey()).noargs();
break;
default:
log.warn("queue [{}] config unspecified exchange!", q.getName());
break;
}
return binding;
}
}
添加rabbitMQ配置
fig.mq;
slf4j.Slf4j;
import org.Message;
import org.springframework.tion.CachingConnectionFactory;
import org.springframework.tion.ConnectionFactory;
import org.springframework.tion.CorrelationData;
import org.springframework.RabbitTemplate;
import org.t.properties.EnableConfigurationProperties;
import t.annotation.Bean;
import t.annotation.Configuration;
import t.annotation.PropertySource;
/**
* RabbitMQ配置
*
* @author gch
*/
@Slf4j
@Configuration
@EnableConfigurationProperties(RabbitMqProperties.class)
//@PropertySource(value = "l")
public class RabbitConfiguration {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,⽆论消息推送结果怎么样都强制调⽤回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback:    "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback:    "+"确认情况:"+ack);
System.out.println("ConfirmCallback:    "+"原因:"+cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback:    "+"消息:"+message);
System.out.println("ReturnCallback:    "+"回应码:"+replyCode);
System.out.println("ReturnCallback:    "+"回应信息:"+replyText);
System.out.println("ReturnCallback:    "+"交换机:"+exchange);
System.out.println("ReturnCallback:    "+"路由键:"+routingKey);
}
});
return rabbitTemplate;
}
}
配置⽂件注意两点
1.
@EnableConfigurationProperties(RabbitMqProperties.class)
2.就是监听需要配置⽂件配置:
spring:
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#确认消息已发送到交换机(Exchange)
#publisher-confirms: true
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
经调试:springBoot 版本  2.1.6.RELEASE 使⽤  publisher-confirm-type: correlated ;  2.1.6.RELEASE 版本使⽤  publisher-confirms: true
接下来就可以愉快的⽣产消息了
贴个消费的⽅式
MessageListenerConfig
fig.mq;
import org.AcknowledgeMode;
import org.springframework.tion.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import t.annotation.Bean;
import t.annotation.Configuration;
/**
* @Author : JCccc
* @CreateTime : 2019/9/4
* @Description :
**/
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private EurekaReceiver eurekaReceiver;//消息接收处理类
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。

发表评论