Java中简单使⽤RabbitMQ进⾏消息收发
⽂章⽬录
在项⽬中需要使⽤RabbitMQ作为消息队列,于是写了⼀个RabbitMQ的服务提供类,这个服务提供类包含RabbitMQ相关实例的初始化及建⽴连接、消息的接收以及消息的发送,想偷懒的童鞋拷过去改改配置什么的就可直接使⽤,以此做个记录,⽅便以后温故⽽知新。
Windows中安装RabbitMQ-Server
安装Erlang
因为RabbitMQ是使⽤Erlang开发的,所以要使⽤RabbitMQ-Server⾸先就需要安装Erlang的运⾏环境,可以在官⽹下载安装,安装后需要配置环境变量。
配置⽰例:
ERLANG_HOME:D:\software\erl10.1
Path末尾拼接:%ERLANG_HOME%\bin;
安装RabbitMQ-Server
可在官⽹⾃⾏下载安装,安装后需要配置环境变量。
配置⽰例:
RABBITMQ_SERVER:D:\software\RabbitMQ Server\rabbitmq_server-3.7.9
Path末尾拼接:%RABBITMQ_SERVER%\sbin;
配置完成后便可启动RabbitMQ-Server,初始⽤户名与密码是guest:guest。
其他具体的配置及命令,不详述,请⾃⾏查。
需要使⽤的依赖包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
RabbitMQ服务提供类,含消息接收与发送
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import urrent.TimeoutException;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
/**
* rabbitmq服务提供类
* @author dr
*/
@Component
public class RabbitMQProvider {
private Logger logger = Class()); /**是否启⽤RabbitMQ*/
@Value("${able}")
private boolean enable;
/**交换器名*/
@Value("${hange.name}")
private String exchangeName;
/**下⾏队列名*/
@Value("down.${push.setting.school}")
private String downQueueName;
@Value("down.${push.setting.school}")
private String downRoutingKey;
@Value("up.${push.setting.school}")
private String upQueueName;
@Value("up.${push.setting.school}")
private String upRoutingKey;
@Value("${rabbitmq.setting.username}")
private String username;
@Value("${rabbitmq.setting.password}")
private String password;
@Value("${rabbitmq.setting.host}")
private String host;
@Value("${rabbitmq.setting.port}")
private Integer port;
// 是否⾃动应答
private boolean autoAck =true;
private Channel channelDown;
private Channel channelUp;
private Connection upConnection;
private Connection downConnection;
@Value("${ssage-life-time}")
private Integer messageLifeTime;
/**处理接收到的消息的处理实例,即我们的业务代码*/
@Autowired
private MQDealer mqDealer;
// 初始化
@PostConstruct
private void init(){
if(!enable)return;
try{
// 创建连接
ConnectionFactory factory =new ConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setHost(host);
factory.setPort(port);
// 队列参数
Map<String, Object> args =new HashMap<>();
args.put("x-message-ttl", messageLifeTime*1000);// 消息过期时间// 创建上⾏连接
upConnection = wConnection();
// 创建上⾏通道
channelUp = ateChannel();
/
/ 声明创建配置上⾏队列
channelUp.queueDeclare(upQueueName,true,false,false, args);
// 将队列与交换器绑定,并设置路由码
channelUp.queueBind(upQueueName, exchangeName, upRoutingKey);
downConnection = wConnection();
channelDown = ateChannel();
channelDown.queueDeclare(downQueueName,true,false,false, args);
channelDown.queueBind(downQueueName, exchangeName, downRoutingKey);
receiveMessage();
}catch(Exception e){
<("启动MQ下⾏通道时出现异常!", e);
}
}
/**
* 持续监听队列以接收数据
* @throws IOException
* @throws TimeoutException
*/
private void receiveMessage()throws IOException, TimeoutException {
// 每次缓存5个消息在本地
channelDown.basicQos(5);
channelDown.basicConsume(downQueueName, autoAck,"myConsumerTag",new DefaultConsumer(channelDown){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body)throws IOException {
String message =new String(body,"UTF-8");
logger.debug(
downQueueName +" Received '"+ message +"'"+", routingKey: "+ RoutingKey());
// 处理接收到的消息
mqDealer.deal(message);
// 持续监听
channelDown.basicConsume(downQueueName, autoAck,"myConsumerTag",this);
channelDown.DeliveryTag(),true);
}
});
}
/**
* 向上⾏消息队列发送⼀条消息java环境变量自动配置
* @param message
* @throws IOException
* @throws TimeoutException
*/
public void sendMessage(String message)throws IOException, TimeoutException {
channelUp.basicPublish(exchangeName, upRoutingKey,true, MessageProperties.TEXT_PLAIN, Bytes());
logger.debug("send message to "+ upQueueName +": "+ message);
}
}
解析:
因为⽹友反映使⽤单连接单通道同时发送和接收消息容易导致队列阻塞,所以这⾥采⽤了双连接双通道,分别负责接收和发送;
这⾥使⽤rabbitMQ的⽅式是直接使⽤的rabbitMQ提供的开发包,⽽⾮Spring整合过的;
建议设置消息过期时间,因为太多消息积压在队列中⽽没有消费的话,可能会导致队列的阻塞,我在开发中已经碰到了⼀次;
在声明队列时,如果该队列不存在则会创建,如果已经存在则直接使⽤现有的,需要注意的是,如果已经有现存队列,那么再次声明时,需要属性和现存的完全⼀致,否则会出现异常;
这段代码中属性的加载,连接、通道等实例的初始化使⽤了Spring的IoC,如果项⽬没有使⽤Spring,需要⾃⾏加载和初始执⾏;
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论