RabbitMQ的web页⾯介绍(三)
⼀、Virtual Hosts
每⼀个 RabbitMQ 服务器都能创建虚拟的消息服务器,我们称之为虚拟主机 (virtual host) ,简称为vhost。每⼀个 vhost 本质上是⼀个独⽴的⼩型RabbitMQ 服务器,拥有⾃⼰独⽴的队列、交换器及绑定关系等,井且它拥有⾃⼰独⽴的权限。vhost 就像是虚拟机与物理服务器⼀样,它们在各个实例间提供逻辑上的分离,为不同程序安全保密地运⾏数据,它既能将同⼀个RabbitMQ 中的众多客户区分开,⼜可以避免队列和交换器等命名冲突。vhost 之间是绝对隔离的,⽆法将 vhostl 中的交换器与 vhost2 中的队列进⾏绑定,这样既保证了安全性,⼜可以确保可移植性。如果在使⽤ RabbitMQ 达到⼀定规模的时候,建议⽤户对业务功能、场景进⾏归类区分,并为之分配独⽴的 vhost。
1.1、Virtual Hosts 的功能说明
vhost可以限制最⼤连接数和最⼤队列数,并且可以设置vhost下的⽤户资源权限和Topic权限,具体权限见下⽅说明。
在 Admin -> Limits 页⾯可以设置vhost的最⼤连接数和最⼤队列数,达到限制后,继续创建,将会报错。
⽤户资源权限是指RabbitMQ ⽤户在客户端执⾏AMQP操作命令时,拥有对资源的操作和使⽤权限。权限分为三个部分: configure、write、read ,见下⽅表格说明。参考:
AMQP 0-9-1 Operation configure write read
exchange.declare(passive=false)exchange
exchange.declare(passive=true)
exchange.declare(with [AE](ae.html))exchange exchange (AE)exchange
exchange.delete exchange
queue.declare(passive=false)queue
queue.declare(passive=true)
queue.declare(with [DLX](dlx.html))queue exchange (DLX)queue
queue.delete queue
exchange.bind exchange (destination)exchange (source)
exchange.unbind exchange (destination)exchange (source)
queue.bind queue exchange
queue.unbind queue exchange
basic.publish exchange
< queue
queue.purge queue
举例说明:
⽐如创建队列时,会调⽤ queue.declare ⽅法,此时会使⽤到 configure 权限,会校验队列名是否与 configure 的表达式匹配。
⽐如队列绑定交换器时,会调⽤ queue.bind ⽅法,此时会⽤到 write 和 read 权限,会检验队列名是否与 write 的表达式匹配,交换器名是否与 read 的表达式匹配。
Topic权限,参考:
Topic权限是RabbitMQ 针对STOMP和MQTT等协议实现的⼀种权限。由于这类协议都是基于Topic消费的,⽽AMQP是基
于Queue消费,所以AMQP的标准资源权限不适合⽤在这类协议中,⽽Topic权限也不适⽤于AMQP协议。所以,我们⼀般不会去使⽤它,只⽤在使⽤了MQTT这类的协议时才可能会⽤到。
2.2、vhost使⽤⽰例
1. 使⽤管理员⽤户登录Web管理界⾯。
2.页⾯添加⼀个名为 v1 的Virtual Hosts。(此时还需要为此vhost分配⽤户,添加⼀个新⽤户)
3.在 Admin -> Users 页⾯添加⼀个名为 order-user 的⽤户,并设置为 management ⾓⾊。
4. 从 Admin 进⼊ order-user 的⽤户设置界⾯,在 Permissions 中,为⽤户分配vhost为/v1,并为每种权限设置需要匹配的⽬标名称的正则表达
式。
字段名值说明
Virtual Host /v1 指定⽤户的vhost,以下权限都只限于 /v1 vhost中
Configure regexp eq-.*只能操作名称以eq-开头的exchange或queue;为空则不能操作任何exchange和queue
Write regexp .*能够发送消息到任意名称的exchange,并且能绑定到任意名称的队列和任意名称的⽬标交
换器(指交换器绑定到交换器),为空表⽰没有权限
Read regexp ^test$ 只能消费名为test队列上的消息,并且只能绑定到名为test的交换器
5.代码演⽰
public class Producer {
public static void main(String[] args) {
// 1、创建连接⼯⼚
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setUsername("order-user");
factory.setPassword("order-user");
factory.setVirtualHost("v1");
Connection connection = null;
Channel channel = null;
// 3、设置每个节点的链接地址和端⼝
Address[] addresses = new Address[]{
new Address("192.168.0.1", 5672),
new Address("192.168.0.2", 5672)
};
try {
// 开启/关闭连接⾃动恢复,默认是开启状态
factory.setAutomaticRecoveryEnabled(true);
/
/ 设置每100毫秒尝试恢复⼀次,默认是5秒:com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL
factory.setNetworkRecoveryInterval(100);
factory.setTopologyRecoveryEnabled(false);
// 4、使⽤连接集合⾥⾯的地址获取连接
connection = wConnection(addresses, "⽣产者");
// 添加重连
((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
/**
* 重连成功后的回调
* @param recoverable
*/
public void handleRecovery(Recoverable recoverable) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 已重新建⽴连接!");
}
/**
* 开始重连时的回调
* @param recoverable
*/
public void handleRecoveryStarted(Recoverable recoverable) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 开始尝试重连!");
}
});
/
/ 5、从链接中创建通道
channel = ateChannel();
/**
* 6、声明(创建)队列
* 如果队列不存在,才会创建
* RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
*
* queueDeclare参数说明:
* @param queue 队列名称
* @param durable 队列是否持久化
* @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭
时会⾃动删除,不受持久化和⾃动删除的属性控制            * @param autoDelete 是否⾃动删除,当最后⼀个消费者断开连接之后是否⾃动删除
* @param arguments 队列参数,设置队列的有效期、消息最⼤长度、队列中所有消息的⽣命周期等等
*/
channel.queueDeclare("queue1", false, false, false, null);
channel.queueBind("queue1", "test-exchange", "xxoo");
for (int i = 0; i < 100; i++) {
// 消息内容
String message = "Hello World " + i;
try {
// 7、发送消息
channel.basicPublish("test-exchange", "queue1", null, Bytes());
} catch (AlreadyClosedException e) {
// 可能连接已关闭,等待重连
System.out.println("消息 " + message + " 发送失败!");
TimeUnit.SECONDS.sleep(2);
continue;
}
System.out.println("消息 " + i + " 已发送!");
TimeUnit.SECONDS.sleep(2);
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 8、关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
正则匹配公司名称e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 9、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
public class VirtualHosts {
public static void main(String[] args) {
// 1、创建连接⼯⼚
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接属性
factory.setUsername("order-user");
factory.setPassword("order-user");
factory.setVirtualHost("v1");
Connection connection = null;
Channel prducerChannel = null;
Channel consumerChannel = null;
// 3、设置每个节点的链接地址和端⼝
Address[] addresses = new Address[]{
new Address("192.168.0.1", 5672),
new Address("192.168.0.2", 5672)
};
try {
/
/ 4、从连接⼯⼚获取连接
connection = wConnection(addresses, "消费者");
// 5、从链接中创建通道
prducerChannel = ateChannel();
prducerChannel.queueDeclare("queue1", false, false, true, null);
prducerChannel.queueBind("queue1", "test-exchange", "xxoo");
// 消息内容
String message = "Hello A";
prducerChannel.basicPublish("test-exchange", "c1", null, Bytes());
consumerChannel = ateChannel();
/
/ 创建⼀个消费者对象
Consumer consumer = new DefaultConsumer(consumerChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                    System.out.println("收到消息:" + new String(body, "UTF-8"));
}
};
consumerChannel.basicConsume("queue1", true, consumer);
System.out.println("等待接收消息");
ad();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
// 9、关闭通道
if (prducerChannel != null && prducerChannel.isOpen()) {
try {
prducerChannel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 10、关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2.3、集连接恢复
官⽅资料:;根据官⽅⽂档说明可知:
通过 factory.setAutomaticRecoveryEnabled(true); 可以设置连接⾃动恢复的开关,默认已开启
通过 factory.setNetworkRecoveryInterval(10000); 可以设置间隔多长时间尝试恢复⼀次,默认是5秒:
com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL
什么时候会触发连接恢复?
如果启⽤了⾃动连接恢复,将由以下事件触发:
连接的I/O循环中抛出IOExceiption
读取Socket套接字超时
检测不到服务器⼼跳
在连接的I/O循环中引发任何其他异常
如果客户端第⼀次连接失败,不会⾃动恢复连接。需要我们⾃⼰负责重试连接、记录失败的尝试、实现重试次数的限制等等。ConnectionFactory factory = new ConnectionFactory();
// configure various connection settings
try {
Connection conn = wConnection();
} catch (java.ConnectException e) {
Thread.sleep(5000);
// apply retry logic
}
如果程序中调⽤了 Connection.Close ,也不会⾃动恢复连接。
如果是 Channel-level 的异常,也不会⾃动恢复连接,因为这些异常通常是应⽤程序中存在语义问题(例如试图从不存在的队列消费)。
在Connection和Channel上,可以设置重新连接的,开始重连和重连成功时,会触发。添加和移除监听,需要将Connection或Channel强制转换成Recoverable接⼝。
((Recoverable) connection).addRecoveryListener()
((Recoverable) connection).removeRecoveryListener()

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。