Springboot集成RabbitMQ实现消息的发送和消费
⼀、Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,⽬前共四种类型:direct、fanout、topic、headers 。
direct:⼀对⼀。消息中的路由键(routing key)如果和 Binding 中的 binding key ⼀致,交换器就将消息发到对应的队列中。
topic:⼀对多。通过模式匹配分配消息的路由键属性,将路由键和某个模式进⾏匹配,此时队列需要绑定到⼀个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间⽤点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,“*”匹配不多不少⼀个单词。
fanout:⼴播。每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去,连routing key都不需要。
headers:匹配 AMQP 消息的 header ⽽不是路由键,headers 交换器和 direct 交换器完全⼀致,但性能差很多,⽬前⼏乎⽤不到了。
springboot结构过程:
项⽬结构:
⼆、搭建⽗⼯程rabbitMQ-parent
依赖包l
<packaging>pom</packaging>
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.1.16.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.1.1</version>
</dependency>
</dependencies>
三、搭建消息发送者send
1.结构:
2.依赖l
<dependency>
<groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-amqp</artifactId>        </dependency>
3.配置⽂件l
server:
port: 7070
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
4.启动类SendApp
@SpringBootApplication
public class SendApp {
public static void main(String[] args) {
SpringApplication.run(SendApp.class,args);
}
}
四、搭建消息消费者receive
1.结构
2.依赖l
<dependency>
<groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.配置⽂件l
server:
port: 7070
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
4.启动类ReceiveApp
@SpringBootApplication
public class ReceiveApp {
public static void main(String[] args) {
SpringApplication.run(ReceiveApp.class,args);
}
}
五、发送消息
1.send中创建配置类AmqpSendConfig
@Configuration
public class AmqpSendConfig {
@Bean //模拟<bean>标签
public DirectExchange getDirectExchange(){
return new DirectExchange("myDirectExchange");
}
@Bean
public FanoutExchange getFanoutExchange(){
return new FanoutExchange("myFanoutExchange");
}
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("myTopicExchange");
}
}
2.SendService
@Service
public class SendService {
//⾃动注⼊amqpTemplate模板对象
@Resource
private AmqpTemplate amqpTemplate;
//direct⽅式
public  String driectSend(){
/**
*参数⼀:交换机名称,需要事先在config创建
*参数⼆:routing Key  根据这个Key发送消息到指定地⽅
*参数三:消息的具体内容
**/
return "direct发送成功了";
}
//fanout⽅式
public String fanoutSend(){
int []arr={1,2,3,4};
//fanout为⼴播,不需要rountkey
vertAndSend("myFanoutExchange","",arr);
return "fanout发送成功了";
}
//topic⽅式
public String topicSend() {
vertAndSend("myTopicExchange","abc.def","1111115555555");
return "topic发送成功了";
}
}
3.SendController
@RestController
public class SendController {
@Autowired
private SendService sendService;
@RequestMapping("/direct")
public String directExchange(){
return this.sendService.driectSend();
}
@RequestMapping("/fanout")
public String fanoutExchange(){
return this.sendService.fanoutSend();
}
@RequestMapping("/topic")
public String topicExchange() {
return picSend();
}
}
六、接收消息
direct⽅式配置类DirectReceiveConfig
@Configuration
public class DirectReceiveConfig {
//1.创建⼀个名字为myDirectQueue的队列
@Bean
public Queue getQueue() {
return new Queue("myDirectQueue");
}
//2.创建名字为myDirectExchange的交换机
@Bean
public Exchange getExchange() {
return new DirectExchange("myDirectExchange");
}
//3.将队列绑定到交换机
@Bean("binding")
public Binding binding(Queue getQueue,Exchange getExchange){
Binding myBinding = BindingBuilder.bind(getQueue)
.to(getExchange)
.with("directKey")
.noargs();
return myBinding;
}
}
fanout⽅式配置类FanoutReceiveConfig
@Configuration
public class FanoutReceiveConfig {
@Bean
public Queue fanoutQueue(){
return new Queue("myFanoutQueue");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("myFanoutExchange");
}
@Bean
public Binding fanoutBinding(Queue fanoutQueue,FanoutExchange fanoutExchange){ return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
}
topic⽅式配置类TopicReceiveConfig
@Configuration
public class TopicReceiveConfig {
//队列⼀
@Bean
public Queue topicQueue(){
return new Queue("myTopicQueue");
}
//队列⼆
@Bean
public Queue topicQueue2(){
return new Queue("myTopicQueue2");
}
//交换机
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("myTopicExchange");
}
//队列⼀绑定到交换机
@Bean
public Binding binding1(Queue topicQueue,TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue)
.to(topicExchange)
.with("abc.#");//routingKey以abc开头的都可以接受到消息
}
//队列⼀绑定到交换机
@Bean
public Binding binding2(Queue topicQueue2,TopicExchange topicExchange){
return BindingBuilder.bind(topicQueue2)
.to(topicExchange)
.with("#.def");//routingKey以def结尾的都可以接受到消息
}
}
2.ReceiveService
@Service
public class ReceiveService {
//direct⽅式
//@RabbitListener注解⽤于标记当前⽅法为消息监听⽅法,可以监听某个队列,当队列中有新消息则⾃动完成接收,    @RabbitListener(queues = "myDirectQueue")
public void receive(String message){
System.out.println("接收:"+message);
}
//fanout⽅式
@RabbitListener(queues = "myFanoutQueue")
public void fanoutReceive(String o){
System.out.println("fanout接收:"+ o);
}
//topic⽅式
@RabbitListener(queues = "myTopicQueue")
public void topicReceive1(String oi) {
System.out.println("topic1接收:"+ oi);
}
@RabbitListener(queues = "myTopicQueue2")
public void topicReceive2(String oi) {
System.out.println("topic2接收:"+ oi);
}
}
七、访问测试
1.启动RabbitMQ服务器
2.启动发送⽅
3.启动接收⽅
4.浏览器访问查看是否发送成功

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。