SpringBoot+RabbitMQ(保证消息100%投递成功并被消费)
点击上⽅蓝⾊字体,选择“标星”
优质⽂章,第⼀时间送达
▊ ⽼赵推荐(戳下⽅标题)
转⾃:简书,作者:wangzaiplus
⼀、先扔⼀张图
说明:
本⽂涵盖了关于RabbitMQ很多⽅⾯的知识点, 如:
消息发送确认机制
消费确认机制
消息的重新投递
消费幂等性, 等等
这些都是围绕上⾯那张整体流程图展开的, 所以有必要先贴出来, 见图知意。
⼆、实现思路
简略介绍163邮箱授权码的获取
编写发送邮件⼯具类
编写RabbitMQ配置⽂件
⽣产者发起调⽤
消费者发送邮件
定时任务定时拉取投递失败的消息, 重新投递
各种异常情况的测试验证
拓展: 使⽤动态代理实现消费端幂等性验证和消息确认(ack)。
三、项⽬介绍
springboot版本2.1.5.RELEASE, 旧版本可能有些配置属性不能使⽤, 需要以代码形式进⾏配置
RabbitMQ版本3.7.15
MailUtil: 发送邮件⼯具类
RabbitConfig: rabbitmq相关配置
TestServiceImpl: ⽣产者, 发送消息
MailConsumer: 消费者, 消费消息, 发送邮件
ResendMsg: 定时任务, 重新投递发送失败的消息
说明: 上⾯是核⼼代码, MsgLogService mapper xml等均未贴出, 完整代码可以参考GitHub上的源码,地址在⽂末。
四、代码实现
1、163邮箱授权码的获取, 如图:
该授权码就是配置⽂件spring.mail.password需要的密码。
2、pom
<!--mq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--mail-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
3、rabbitmq、邮箱配置
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启confirms回调 P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 开启returnedMessage回调 Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 设置⼿动确认(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
# mail
spring.mail.host=smtp.163
spring.mail.username=186********@163
spring.mail.password=123456wangzai
spring.mail.from=186********@163
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.able=true
spring.mail.properties.mail.quired=true
说明: password即授权码, username和from要⼀致。
4、表结构
CREATE TABLE `msg_log` (
`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯⼀标识',
`msg` text COMMENT '消息体, json格式化',
`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机',
`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键',springboot aop
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费',  `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_try_time` datetime DEFAULT NULL COMMENT '下⼀次重试时间',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`msg_id`),
UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递⽇志';
说明: exchange routing_key字段是在定时任务重新投递消息时需要⽤到的。
5、MailUtil
public class MailUtil {
@Value("${spring.mail.from}")
private String from;
@Autowired
private JavaMailSender mailSender;
/**
* 发送简单邮件
*
* @param mail
*/
public boolean send(Mail mail) {
String to = To();// ⽬标邮箱
String title = Title();// 邮件标题
String content = Content();// 邮件正⽂
SimpleMailMessage message = new SimpleMailMessage();        message.setFrom(from);
message.setTo(to);
message.setSubject(title);
message.setText(content);
try {
mailSender.send(message);
log.info("邮件发送成功");
return true;
} catch (MailException e) {
<("邮件发送失败, to: {}, title: {}", to, title, e);
return false;
}
}
}
6、RabbitConfig
public class RabbitConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private MsgLogService msgLogService;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(converter());
// 消息是否成功发送到Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功发送到Exchange");
String msgId = Id();
msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
} else {
log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
}
});
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有到Queue就会丢弃掉消息, ⽽不会触发回调
rabbitTemplate.setMandatory(true);
// 消息是否从Exchange路由到Queue, 注意: 这是⼀个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个⽅法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText        });
return rabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter converter() {
return new Jackson2JsonMessageConverter();
}
// 发送邮件
public static final String MAIL_QUEUE_NAME = "mail.queue";
public static final String MAIL_EXCHANGE_NAME = "hange";
public static final String MAIL_ROUTING_KEY_NAME = "uting.key";
@Bean
public Queue mailQueue() {
return new Queue(MAIL_QUEUE_NAME, true);
}
@Bean
public DirectExchange mailExchange() {
return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
}
@Bean
public Binding mailBinding() {
return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
}
}

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。