SpringCloud之RabbitMQ消息队列原理及配置
本篇章讲解RabbitMQ的⽤途、原理以及配置,RabbitMQ的安装请查看
⼀、MQ⽤途
1、同步变异步消息
场景:⽤户下单完成后,发送邮件和短信通知。
运⽤消息队列之后,⽤户下单完之后,下单信息写⼊数据库,再写⼊消息队列,发送邮件和各⾃去消息队列进⾏读取,节省时间,提⾼效率。
2、应⽤解耦
场景:⽤户下单后,订单系统需要多渠道通知⽤户。
下单服务系统:⽤户使⽤下单服务后,将下单信息写⼊数据库,下单成功。
短信服务系统:⽤户下单后,将短信信息写⼊消息队列,以信息通知⽤户交易信息。
邮件服务系统:⽤户下单后,将邮件信息写⼊消息队列,以发送邮件信息通知⽤户交易信息。
这样,如果通知不能正常使⽤,也不影响⽤户下单,⽤户下单后,只⽤把下单通知信息写⼊消息队列,不⽤关⼼后续操作,实现了订单系统和通知系统的解耦。
3、流量削峰
⼀般在秒杀或者团购活动中使⽤。
场景:秒杀活动,⼀般会因为流量过⼤,导致流量暴增,应⽤挂掉。针对这个问题,⼀般需要在应⽤前端加⼊消息队列。
a.可以控制活动的⼈数
springcloud和springboot b.可以缓解短时间内⾼流量压垮应⽤
⽤户的请求,服务器接收后,⾸先写⼊消息队列,如果消息队列的数量⼤于最⼤的数量,则直接抛弃⽤户请求或者跳转错误页⾯。
⼆、RabbitMQ原理介绍
如图所⽰:
各组件意义如下:
三、RabbitMQ应⽤
RabbitMQ包依赖(spring-boot-starter-amqp):
<!-- rabbitMQ的依赖。rabbitmq已经被spring-boot做了整合访问实现。
spring cloud也对springboot做了整合逻辑。所以rabbitmq的依赖可以在spring cloud中直接使⽤。
-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1、Direct交换器
是⼀种点对点,实现发布/订阅标准的交换器。Producer发送消息到RabbitMQ中,MQ中的Direct交换器接受到消息后,会根据Routing Key来决定这个消息要发送到哪⼀个队列中。Consumer则负责注册⼀个队列,来监听队列的状态,当队列状态发⽣变化时,消费消息。注册队列监听需要提供交换器信息,队列信息和路由键信息。
这种交换器通常⽤于点对点消息传输的业务模型中。如电⼦邮箱。
如下图所⽰⽇志处理MQ⽰例:
Producer全局配置⽂件:
spring.application.name=direct-producer
server.port=8082
# 必要配置
# 配置rabbitmq链接相关信息。key都是固定的。是springboot要求的。
# rabbitmq安装位置
spring.rabbitmq.host=localhost
# rabbitmq的端⼝
spring.rabbitmq.port=5672
# rabbitmq的⽤户名
spring.rabbitmq.username=test
# rabbitmq的⽤户密码
spring.rabbitmq.password=123456
# 可选配置
# 配置producer中操作的Queue和Exchange相关信息的。key是⾃定义的。为了避免硬编码(代码中可以写死)。
# exchange的命名。交换器名称可以随意定义。
# 路由键,是定义某⼀个路由键。 info级别⽇志使⽤的queue的路由键。
# 路由键,error级别⽇志使⽤的queue的路由键。
Producer消息发送类:
/**
* 消息发送者 - Producer。
* @Component Producer类型的对象,必须交由Spring容器管理。
* 使⽤SpringBoot提供的AMQP启动器,来访问rabbitmq的时候,都是通过AmqpTemplate来实现的。
* 如果全局配置⽂件中,配置了rabbitmq相关内容,且⼯程依赖了starter-amqp,则spring容器⾃动创建AmqpTemplate对象。
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
//exchange 交换器名称
@Value("${hange}")
private String exchange;
//routingkey 路由键
@Value("${mq.config.uting.key}")
private String routingkey;
/
*
* 发送消息的⽅法
*/
public void send(LogMessage msg){
/**
* convertAndSend - 转换并发送消息的template⽅法。
* 是将传⼊的普通java对象,转换为rabbitmq中需要的message类型对象,并发送消息到rabbitmq中。
* 参数⼀:交换器名称。类型是String
* 参数⼆:路由键。类型是String
* 参数三:消息,是要发送的消息内容对象。类型是Object
*/
hange, utingkey, msg);
}
}
Producer实体类:
/**
* 消息内容载体,在rabbitmq中,存储的消息可以是任意的java类型的对象。
* 强制要求,作为消息数据载体的类型,必须是Serializable的。
* 如果消息数据载体类型未实现Serializable,在收发消息的时候,都会有异常发⽣。
*/
public class LogMessage implements Serializable {
private Long id;
private String msg;
private String logLevel;
private String serviceType;
private Date createTime;
private Long userId;
public LogMessage() {
super();
}
public LogMessage(Long id, String msg, String logLevel, String serviceType, Date createTime, Long userId) {
super();
this.id = id;
this.msg = msg;
this.logLevel = logLevel;
this.serviceType = serviceType;
this.userId = userId;
}
@Override
public String toString() {
return "LogMessage [id=" + id + ", msg=" + msg + ", logLevel=" + logLevel + ", serviceType=" + serviceType
+ ", createTime=" + createTime + ", userId=" + userId + "]";
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getLogLevel() {
return logLevel;
}
public void setLogLevel(String logLevel) {
this.logLevel = logLevel;
}
public String getServiceType() {
return serviceType;
}
public void setServiceType(String serviceType) {
this.serviceType = serviceType;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
}
Producer消息产⽣测试类:
/**
* Direct交换器
* Producer测试。
* 注意:
* 在rabbitmq中,consumer都是listener监听模式消费消息的。
* ⼀般来说,在开发的时候,都是先启动consumer,确定有什么exchange、queue、routing-key,然后再启动producer。 * 然后再启动producer发送消息,。
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes=SpringbootServerApplication.class)
public class QueueTest {
@Autowired
private Sender sender;
/*
* 测试消息队列
*/
@Test
public void testSend()throws Exception{
Long id = 1L;
while(true){
Thread.sleep(1000);
this.sender.send(new LogMessage(id,"test log", "info", "订单服务", new Date(), id));
id++;
}
}
}
Consumer全局配置:
spring.application.name=direct-consumer
server.port=8083
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456
# ⾃定义配置。配置交换器exchange、路由键routing-key、队列名称 queue name;在RabbitMQ中队列的⽣成
# 交换器名称
# info级别queue的名称
# info级别的路由键
# error级别queue的名称
<=
# error级别的路由键
Consumer消费者:
/**
* 消息接收者 - consumer
*
* @RabbitListener - 可以注解类和⽅法。
* 注解类,当表当前类的对象是⼀个rabbit listener。
* 监听逻辑明确,可以由更好的⽅法定义规范。
* 必须配合@RabbitHandler才能实现rabbit消息消费能⼒,⼀个类可以有多个⽅法,但是仅有⼀个⽅法注解@RabbitHandler。
* 注解⽅法,代表当前⽅法是⼀个rabbit listener处理逻辑。
* ⽅便开发,⼀个类中可以定义若⼲个listener逻辑。
* ⽅法定义规范可能不合理。如:⼀个⽅法的处理逻辑太多,造成⽅法的bad smell。
*
* @RabbitListener - 代表当前类型是⼀个rabbitmq的。
* bindings:绑定队列
* @QueueBinding - @RabbitListener.bindings属性的类型。绑定⼀个队列。
* value:绑定队列, Queue类型。
* exchange:配置交换器, Exchange类型。
* key:路由键,字符串类型。
*
* @Queue - 队列。
* value:队列名称
* autoDelete:是否是⼀个临时队列。
* true :当所有的consumer关闭后,⾃动删除queue。
* false:当任意⼀个consumer启动并创建queue后,如果queue中有消息未消费,⽆论是否有consumer继续执⾏,都保存queue。
*
* @Exchange - 交换器
* value:为交换器起个名称
* type:指定具体的交换器类型
*/
@Component
@RabbitListener(
bindings=@QueueBinding(
value=@Queue(value="${mq.}",autoDelete="false"),
exchange=@Exchange(value="${hange}",type=ExchangeTypes.DIRECT),
key="${mq.routing.key}"
)
)
public class ErrorReceiver {
/**
* 消费消息的⽅法。采⽤消息队列监听机制
* @RabbitHandler - 代表当前⽅法是监听队列状态的⽅法,就是队列状态发⽣变化后,执⾏的消费消息的⽅法。
* ⽅法参数。就是处理的消息的数据载体类型。
*/
@RabbitHandler
public void process(LogMessage msg){
System.out.println("iver: "+msg);
}
}
2、Topic交换器
主题交换器,也称为规则匹配交换器。是通过⾃定义的模糊匹配规则来决定消息存储在哪些队列中。当Producer发送消息到RabbitMQ中时,MQ中的交换器会根据路由键来决定消息应该发送到哪些队列中。
Consumer同样是注册⼀个到队列,监听队列状态,当队列状态发⽣变化时,消费消息。注册需要提供交换器信息,队列信息和路由键信息。
如下图所⽰⽇志处理MQ⽰例:
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论