Springboot集成RabbitMQ,注解式开发,包含死信队列,消费回调介绍RabbitMQ浅度学习
这⾥介绍注解式开发RabbitMQ
RabbitMQ是实现了⾼级消息队列协议(AMQP)的开源消息代理软件(亦称⾯向消息的中间件)。RabbitMQ服务器是⽤Erlang语⾔编写的,⽽集和故障转移是构建在开放电信平台框架上的。所有主要的编程语⾔均有与代理接⼝通讯的客户端库。 ---- 来⾃百度百科
Springboot集成RabbitMQ, 上代码
Springboot 2.2.2.RELEASE
JDK 1.8
*准备⼯作:
1: 在RabbitMQ中添加exchange, queue
2: 将exchange和queue进⾏绑定
3: 绑定规则, routingKey 设置为xxx.* 模糊后⼀位 Example: user.insert
不能匹配user.insert.demo, 如要模糊多位, 使⽤xxx.#
4. 路由规则也可精确匹配
也可不⽤⼿动创建exchange和queue, 以及他们的绑定关系 , 代码编写好后, 启动项⽬, 正常连接到RabbitMQ后相应的exchange和queue 会⾃动创建并且绑定好
展⽰关键依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ部分配置
spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
#默认的虚拟主机
spring.rabbitmq.virtual-host=my_vhost
#连接超时时间15s
tion-timeout=15000
#============rabbitmq消费者配置===========#
#并发数主页的channel会有五个
spring.rabbitmq.urrency=5
#最⼤并发数
#spring.rabbitmq.listener.simple.max-concurrency=10
#auto⾃动签收    manual⼿动签收,只要谈到消息可靠,基本都是⼿动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#限流同⼀时间只允许⼀条消息
spring.rabbitmq.listener.simple.prefetch=5
# 回调⽅式消息确认模式,消息发出后异步等待broker回送响应
# confirm机制,回调消息到达了exchange的消息
spring.rabbitmq.publisher-confirm-type=correlated
# return机制,回调消息没有到达queue的消息
spring.rabbitmq.publisher-returns=true
#配置为true,到不了queue会回调⽅法
plate.mandatory=true
配置⽂件完全可配置类替代, 看个⼈爱好, 书写习惯
发送端:
**
注意:
这⾥RabbitTemplate⽤构造器注⼊的⽽没有直接⽤@Autowired, 是因为我没有写RabbitMQConfig配置类,
准确说是我没有⽤配置类将RabbitTemplate变成⼀个⾮Spring实例化单例的Bean. B装完了, 说⽩了,
这⾥RabbitTemplate不能是单例的, Spring默认创建的Bean是单例的. 单例的RabbitTemplate只能回调⼀次,
⼀个RabbitTemplate只能回调⼀次. (这个说法可能不太准确, 官⽅⼀点我也不知道咋说…
如果⽤配置类来配置RabbitTemplate可以在返回RabbitTemplate的⽅法上加上@Scope(“prototype”),
这样⽤注解⽅式注⼊的RabbitTemplate就不是单例了
这个类⾥两个关键:
1. 多例的RabbitTemplate
2. 设置回调的类
import com.sunnyfe.rabbitmq.demo.callback.RabbitmqCallback;
import com.sunnyfe.ity.User;
log4j.Log4j2;
import org.springframework.amqp.AmqpException;
import org.springframework.tion.CorrelationData;
import org.springframework.RabbitTemplate;
import org.springframework.stereotype.Component;
/
**
* @author maple
*/
@Component
@Log4j2
public class InsertUserSender {
/**
* ⾮单例Bean, 构造器注⼊
*/
private final RabbitTemplate rabbitTemplate;
/**
* 构造⽅法注⼊rabbitTemplate成为⼀个多例的rabbitTemplate
* 如果是但⾥的rabbitmq, 只能回调⼀次 , 再次调⽤的时候会报错: ⼀个rabbitmqTemplate只能回调⼀次    */
public InsertUserSender(RabbitTemplate rabbitTemplate){
this.rabbitTemplate = rabbitTemplate;
// 设置回调地址消息到达exchange的回调
rabbitTemplate.setConfirmCallback(new RabbitmqCallback());
// 消息失败的回调没有到queue的回调
rabbitTemplate.setReturnCallback(new RabbitmqCallback());
}
public void sendInsertUser(User user){
CorrelationData correlationData =new CorrelationData();
correlationData.PrimaryKey());
try{
//exchange 交换机相应要在rabbitmq中添加  user-exchange
"liveExchange",
//routingKey 路由key    user.insert
"info",
//消息体内容
user,
//消息唯⼀id
correlationData
)
;
log.info("消息发送成功");
}catch(AmqpException e){
e.printStackTrace();
}
}
}
回调类
1. ⼤可不必另外写⼀个回调类, 可直接写在发送者中, 然后在设置回调类的时候⽤this
2. 要使这两个回调⽣效, 配置⽂件中的confirm机制和return机制⼀定得配
slf4j.Slf4j;
import org.Message;
import org.springframework.tion.CorrelationData;
import org.springframework.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* Detail 回调
*
* @author SunnyFon
* @date 2021年01⽉18⽇ 21:48
*/
@Slf4j
@Component
public class RabbitmqCallback  implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
/**
* 如果没有exchange,也会收到回调
*
* ConfirmCallback 只确认消息是否正确到达 Exchange 中
* 1. 如果消息没有到exchange,则confirm回调,ack=false
* 2. 如果消息到达exchange,则confirm回调,ack=true
*
* 配置参数
*  spring.rabbitmq.publisher-confirms=true
*
* @param correlationData  如果发送⽅没有传这个对象,则为null
* @param ack ack
* @param cause cause
* @see RabbitTemplate.ConfirmCallback#confirm
*/
@Override
public void confirm(CorrelationData correlationData,boolean ack, String cause){
log.info(" 回调id:"+ correlationData);
if(ack){
log.info("消息成功到达exchange");
}else{
<("消息未成功到达exchange:"+ cause);
}
}
/**
* Returned message callback. 到不了queue会回调到该⽅法
*
* @param message    the returned message.
* @param replyCode  the reply code.
* @param replyText  the reply text.
* @param exchange  the exchange.
* @param routingKey the routing key.
*/
@Override
public void returnedMessage(Message message,int replyCode, String replyText, String exchange, String routingKey){        System.out.println("未到queue的回调么");
}
}
消息接收:
注解绑定队列和交换机(exchange), 还有路由.
死信队列信息就是arguments参数, 不需要死信队列可以去掉整个arguments={}
有⼏个关键点:
1. 注解放的位置⼀定得⽅正确, 括号啥都不能乱, 否则脑袋破了, 不知道为啥不创建死信队列.
2. 配置⽂件签收⽅式⼀定设置为⼿动签收
3. 我这⾥发送的是User对象, 但是接收⽤Message接收, 有点不讲武德可以改成public void process(@Payload User user,
@Headers Map<String, Object> headers, Channel channel)
4. 也可⽤Body获取对象信息.
5. ⾄于argument中的name定义都是来源于RabbitMQ
6. 当消费消息出现异常时, 就会⾛catch代码, 就会执⾏ channel.basicNack(consumerTag, false, false); consumerTag是消息唯⼀
id, 第⼀个false是否批量执⾏, 第⼆个false是否重新进⼊队列basicNack⽅法后会将消息放⼊死信队列, 就可在死信队列中对消费失败的消息进⾏业务逻辑执⾏, 消费死信队列中的消息.
7. 死信队列 , 可以理解为⼀个普通的队列
package com.sunnyfe.rabbitmq.demo.rabbitmqtest;
import com.rabbitmq.client.Channel;
import org.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* Detail RabbitMQ消息接收
*
* @author SunnyFon
* @date 2021年01⽉18⽇ 17:39
*/
springboot是啥@Component
public class Receiver {
@RabbitListener(bindings ={
@QueueBinding(value =@Queue(value ="liveQueue", arguments =
{@Argument(name ="x-dead-letter-exchange", value ="deadExchange"),
@Argument(name ="x-dead-letter-routing-key", value ="deadKey")
,@Argument(name ="x-message-ttl", value ="10000", type ="java.lang.Integer")
// ,@Argument(name = "x-max-length",value = "5",type = "java.lang.Integer")队列最⼤长度
}),
exchange =@Exchange(value ="liveExchange"),
key ={"info","error","warning"}
)
})
//            设置成manual⼿动确认,⼀定要对消息做出应答,否则rabbit认为当前队列没有消费完成,将不再继续向该队列发送消息。
//            1.channel.basicAck(long,boolean); 确认收到消息,消息将被队列移除,false只确认当前consumer⼀个消息收到,true确认所有consumer获得的消息。
//            2.channel.basicNack(long,boolean,boolean); 确认否定消息,第⼀个boolean表⽰⼀个consumer还是所有,第⼆个boolean表⽰requeue是否重新回到队列,true重新⼊队。
//            3.channel.basicReject(long,boolean); 拒绝消息,requeue=false 表⽰不再重新⼊队,如果配置了死信队列则进⼊死信队列。
//            4.当消息回滚到消息队列时,这条消息不会回到队列尾部,⽽是仍是在队列头部,这时消费者会⼜接收到这条消息,如果想消息进⼊队尾,须确认消息后再次发送消息。
@RabbitHandler
public void onMessage(Message message, Channel channel){
long consumerTag = MessageProperties().getDeliveryTag();
try{
// int i = 1/0;  模拟消费消息, 回调接收ack是true 因为回调的ack根据exchange是否收到消息判断
//            int i = 1/0;
System.out.println("收到消息");
System.out.println(new Body()));
/*
* 配置⽂件中配置的是⼿动签收
* 如果注释ACK  也可以消费信息,  不过在控制台上消息仍然存在消息会回到队列
*
* 业务逻辑完成后⼀定更要告诉rabbitmq服务器消息消费完了

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。