springboot+rabbitmq+websocket⼴播模式进⾏消息实时推送如何安装rabbitmq在此就不再赘述了,直接上代码,使⽤的direct队列模式。
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
⽣产者
application.properties
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#开启消息发送确认
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
#返回⽆法插⼊队列的消息
plate.mandatory=true
准备消息队列与路由key
public class RabbitConstant {
//交换机名称
public final static String EXCHANGE = "exchange_test";
//队列
public final static String QUEUE_TRANSACTION = "queue_transaction";
public final static String QUEUE_CONTRACT = "queue_contract";
public final static String QUEUE_QUALIFICATION = "queue_qualification";
//路由key
public final static String RK_TRANSACTION = "transaction";
public final static String RK_CONTRACT = "contract";
public final static String RK_QUALIFICATION = "qualification";
}
配置
@Configuration
public class RabbitMqConfig {
/**
* 声明队列
*
* @return
*/
@Bean
public Queue queueTransaction() {
public Queue queueTransaction() {
// true表⽰持久化该队列
return new Queue(RabbitConstant.QUEUE_TRANSACTION, true);
}
@Bean
public Queue queueContract() {
// true表⽰持久化该队列
return new Queue(RabbitConstant.QUEUE_CONTRACT, true);
}
@Bean
public Queue queueQualification() {
// true表⽰持久化该队列
return new Queue(RabbitConstant.QUEUE_QUALIFICATION, true);
}
/**
* 声明交互器
*
* @return
*/
@Bean
DirectExchange directExchange() {
return new DirectExchange(RabbitConstant.EXCHANGE);
}
/**
* 绑定
*
* @return
*/
@Bean
public Binding bindingTransaction() {
return BindingBuilder.bind(queueTransaction()).to(directExchange()).with(RabbitConstant.RK_TRANSACTION); }
/**
* 绑定
*
* @return
*/
@Bean
public Binding bindingContract() {
return BindingBuilder.bind(queueContract()).to(directExchange()).with(RabbitConstant.RK_CONTRACT);
}
/**
* 绑定
*
* @return
*/
@Bean
public Binding bindingQualification() {
return BindingBuilder.bind(queueQualification()).to(directExchange()).with(RabbitConstant.RK_QUALIFICATION); }
}
发布者实现接⼝
@Component
@Slf4j
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
//消息发送确认回调⽅法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:" + correlationData);
}
//消息发送失败回调⽅法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
<("消息发送失败:" + new Body()));
}
/**
* 发送消息,不需要实现任何接⼝,供外部调⽤
*
* @param messageVo
*/
public void send(MessageVo messageVo) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
}
Ok,只要调⽤send⽅法就可⼀把消息发送到指定的队列中了,发送后会调⽤confirm或者returnedMessage⽅法来返回结果,
下⾯开始讲消费者。
消费者
application.properties
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
消费者配置
import com.rabbitmq.client.Channel;
d.rabbitmq.web.websocket.WebSocketServerEndpoint;
slf4j.Slf4j;
import org.AcknowledgeMode;
import org.Message;
import org.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.tion.ConnectionFactory;
import org.springframework.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import t.annotation.Bean;
import t.annotation.Configuration;
import verter.MappingJackson2MessageConverter;
import ssaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;websocket和socket
import java.io.IOException;
@Configuration
@EnableRabbit
@Slf4j
public class ConsumerConfig implements RabbitListenerConfigurer {
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private WebSocketServerEndpoint webSocketServerEndpoint;
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(new MappingJackson2MessageConverter());
return factory;
}
@Bean
public SimpleMessageListenerContainer messageContainer(@Qualifier("queueTransaction") Queue transaction, @Qualifier("queueContract") Queue contract, SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(transaction, contract, qualification);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式⼿⼯确认
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = Body();
log.info("receive msg : " + new String(body));
try {
webSocketServerEndpoint.sendMessageToAll(new String(body));
channel.MessageProperties().getDeliveryTag(), false);//确认消息成功消费
} catch (IOException e) {
<("消息推送前台出错:" + e.getMessage() + "/r/n重新发送");
channel.MessageProperties().getDeliveryTag(), false, true); //重新发送
}
}
});
return container;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
}
}
配置完后会⾃动监听消息队列,AcknowledgeMode设置为⼿动模式可以⾃⼰来管理是否确认消费成功或者重新发送,当然我这⾥重新发送会不断的发送,到时候根据具体业务进⾏处理就⾏了。
websocket
配置
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
socket服务
d.rabbitmq.web.websocket;
import com.alibaba.fastjson.JSONObject;
del.MessageVo;
slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import urrent.ConcurrentHashMap;
/**
* ServerEndpoint
* <p>
* 使⽤springboot的唯⼀区别是要@Component声明下,⽽使⽤独⽴容器是由容器⾃⼰管理websocket的,但在springboot中连容器都是spring管理的。
* <p>
* 虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化⼀个bean,所以可以⽤⼀个静态set保存起来。
*/
@ServerEndpoint("/ws/yxd/{userId}") //WebSocket客户端建⽴连接的地址
@Component
@Slf4j
public class WebSocketServerEndpoint {
/**
* 存活的session集合(使⽤线程安全的map保存)
*/
private static Map<String, Session> livingSessions = new ConcurrentHashMap<>();
/**
* 建⽴连接的回调⽅法
*
* @param session 与客户端的WebSocket连接会话
* @param userId ⽤户名,WebSocket⽀持路径参数
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
livingSessions.Id(), session);
log.info(userId + "进⼊连接");
}
@OnMessage
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论