微服务架构集成RabbitMQ给⽤户推送消息(,发送邮件,发送站内信
息)
因为是分布式微服务项⽬,所以发送⽅在⼀个微服务,接收⽅在另外的⼀个微服务,在发送⽅,导⼊RabbitMQ依赖包
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在yml配置RabbitMQ的ip端⼝号名字密码等
server:
port: 80
#===mq start===
spring:
application:
name: course-server #服务名
rabbitmq:
host: 192.168.0.145
port: 5672
username: yz
password: cgm888666
addresses: 192.168.0.145
listener:
direct:
acknowledge-mode: manual #⼿动签收
template:
receive-timeout: 30000
reply-timeout: 30000
virtual-host: /
#===mq end===
写配置类:创建队列交换机,并将队列绑定到交换机
注意打上注解让spring扫描到配置,还有导包正确
import org.*;
import org.springframework.beans.factory.annotation.Qualifier;
import t.annotation.Bean;
import t.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//三个队列名字短信邮件站内信
//短信
private static final String QUEUE_SMS ="queue_sms";
//邮件
private static final String QUEUE_EMAIL ="queue_email";
//站内信
private static final String QUEUE_SYSTEM ="queue_system";
/
/交换机名称
public static final String EXCHANGE_TOPIC ="exchange_topic";
//定义交换机
@Bean(EXCHANGE_TOPIC)
public Exchange exchange(){
picExchange(EXCHANGE_TOPIC).durable(true).build();
}
//定义邮件队列
@Bean(QUEUE_EMAIL)
public Queue queueEmail(){
return new Queue(QUEUE_EMAIL, true);
}
//定义短信队列
@Bean(QUEUE_SMS)
public Queue queueSMS(){
return new Queue(QUEUE_SMS, true);
}
//定义队列
@Bean(QUEUE_SYSTEM)
public Queue queueSystem(){
return new Queue(QUEUE_SYSTEM, true);
}
/
/将队列绑定到交换机
@Bean
public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("message_Email").noargs();
}
@Bean
public Binding bindingSms(@Qualifier(QUEUE_SMS) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("message_Sms").noargs();
}
@Bean
public Binding bindingSystem(@Qualifier(QUEUE_SYSTEM) Queue queue,@Qualifier(EXCHANGE_TOPIC) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("message_System").noargs();
}
}
发送消息:然后在课程服务层,课程上线以后通知⽤户(不同的业务需求,可以在不同的情况下去通知⽤户)
/**
* 课程上线
* @param id课程id
*/
@Override
public void onLineCourse(Long id){
//根据id查询课程
Course course = baseMapper.selectById(id);
//判断课程是否是下线状态如果不是就直接结束
Status()==Course.ONLINE_STATUS||Status()==null){
<("课程已经上线!");
return;
}
//修改数据库课程状态为上线
course.setStatus(Course.ONLINE_STATUS);
baseMapper.updateById(course);
/
/存到es
//写es控制层和feign接⼝
CourseDetail courseDetail = courseDetailMapper.selectById(id);
CourseMarket courseMarket = courseMarketMapper.selectById(id);
//将course装转成doc对象
CourseDoc courseDoc=new CourseDoc();
courseDoc.Name()+Description());
courseDoc.setPic(Pic());
System.out.Pic());
AjaxResult ajaxResult = commonFeignClient.save(courseDoc);
System.out.println(courseDetail);
System.out.println(courseMarket);
System.out.println(courseDoc);
if(!ajaxResult.isSuccess()){
<("存⼊Es失败!");
}
pushMessage(course);
}
/**
* 课程上线成功以后发送消息通知⽤户
* @param course
*/
private void pushMessage(Course course){
//发送消息
"message_Email",Name());//message_Email:队列绑定在交换机时设置的routingKey,发送的消息内容:Name() vertAndSend(RabbitMQConfig.EXCHANGE_TOPIC,
"message_Sms",Name());
"message_System",Name());
}
在接收⽅导⼊RabbitMQ依赖包
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在yml配置RabbitMQ
spring:
application:
name: course-server #服务名
##配置rabbitmq
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
listener:
simple:
acknowledge-mode: manual #⼿动签收
写消息处理类,监听队列,然后对⽤户发送邮件短信,以及站内信等
import com.rabbitmq.client.Channel;
import org.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MQHandler {
//短信
private static final String QUEUE_SMS ="queue_sms";
//邮件
private static final String QUEUE_EMAIL ="queue_email";
//站内信
private static final String QUEUE_SYSTEM ="queue_system";
//监听队列接收消息
@RabbitListener(queues = QUEUE_SMS)
分布式和微服务的关系public void message_Sms(String msg, Message message, Channel channel){ //获取⽤户电话给⽤户
System.out.println("成功:"+msg);
}
@RabbitListener(queues = QUEUE_EMAIL)
public void message_Email(String msg, Message message,Channel channel){ //获取⽤户的邮箱⽤户id 使⽤丰富邮箱发送超链接
System.out.println("邮箱发送成功:"+msg);
}
@RabbitListener(queues = QUEUE_SYSTEM)
public void meassge_System(String msg,Message message,Channel channel){ //站内消息存⼊表中,状态未处理
System.out.println("站内消息发送成功:"+msg);
}
}
注意点:MQ配置必须正确,@Bean注解 不要忘了。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论