RabbitMQ ⼊门_05_多线程消费同⼀队列
消费⼀条消息往往⽐产⽣⼀条消息慢很多,为了防⽌消息积压,⼀般需要开启多个⼯作线程同时消费消息。在 RabbitMQ 中,我们可以创建多个 Consumer
消费同⼀队列。⽰意图如下:
gordon.study.rabbitmq.workqueue.Sender.java
public class Sender {
private static final String QUEUE_NAME = "tasks";
private String name;
public Sender(String name) {
this.name = name;
}
public void work() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = wConnection();
Channel channel = ateChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 10;) {
String message = "NO. " + ++i;
TimeUnit.MILLISECONDS.sleep(100);
channel.basicPublish("", QUEUE_NAME, null, Bytes("UTF-8"));
System.out.printf("(%1$s)[===>%2$s    ] %3$s\n", name, ":" + QUEUE_NAME, message);
}
channel.close();
connection.close();
}
}gordon.study.rabbitmq.workqueue.Receiver.java
A. 多线程消费同⼀队列
public class Receiver {
private static final String QUEUE_NAME = "tasks";
private String name;
private int sleepTime;
public Receiver(String name, int sleepTime) {
this.name = name;
this.sleepTime = sleepTime;
}
public void work() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = wConnection();
Channel channel = ateChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
gordon.study.rabbitmq.workqueue.Test01.java
public class Test01 {
public static void main(String[] args) throws Exception {
Receiver recv1 = new Receiver("A", 200);
recv1.work();
Receiver recv2 = new Receiver("B", 200);
recv2.work();
Sender sender = new Sender("S");
sender.work();
}
}
运⾏ Test01,发现 A、B 两个消费者轮流获取 S 发送的消息。
RabbitMQ 默认将消息顺序发送给下⼀个消费者,这样,每个消费者会得到相同数量的消息。即,轮询(round-robin)分发消息。
轮询很好,可是如果两个消费者消费能⼒不⼀样呢?
gordon.study.rabbitmq.workqueue.Test02SlowConsumer.java
public class Test02SlowConsumer {
public static void main(String[] args) throws Exception {
Receiver recv1 = new Receiver("A", 200);
recv1.work();
Receiver recv2 = new Receiver("B", 800);
recv2.work();
Sender sender = new Sender("S");
sender.work();
}
}
将消费者B 的消费时间提⾼到800毫秒,问题就出现了:B 依然分到了⼀半消息,需要运⾏很久才能处理完。
B. 公平分发(fair dispatch)
怎样才能做到按照每个消费者的能⼒分配消息呢?联合使⽤ Qos 和 Acknowledge 就可以做到。
gordon.study.rabbitmq.workqueue.QosAcknowledgeReceiver.java
public class QosAcknowledgeReceiver {
private static final String QUEUE_NAME = "tasks";
private String name;
private int sleepTime;
public QosAcknowledgeReceiver(String name, int sleepTime) {
this.name = name;
this.sleepTime = sleepTime;
}
public void work() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = wConnection();
final Channel channel = ateChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.printf(" [    %2$s<===](%1$s) %3$s\n", name, QUEUE_NAME, message);
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
}
channel.DeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
代码第22⾏的 basicQos ⽅法设置了当前信道最⼤预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视⾓去看,总是会有⼀批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是⽤来限制这批未确认消息数量的。设为1时,队列只有
在收到消费者发回的上⼀条消息 ack 确认后,才会向该消费者发送下⼀条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。
basicQos 中 prefetchSize 参数通过消息的总字节数来限制队列推送消息的速度
prefetchSize 与 prefetchCount 可以同时设置,达到任何⼀个限制,则队列暂停推送消息
global 参数表⽰前两个参数的作⽤域,true 表⽰限制是针对信道的,false 表⽰限制是针对消费者的(我还没试过⼀个信道⽀持多个消费者的例⼦,样例代码见下⽅)
可以对同⼀个信道同时设置 global 为 true 和 false 的 Qos,表⽰队列要考虑每个消费者的限制,同时还要考虑整个信道的限制
看起来API注释是错的,因为 global 默认是 false,所以第22⾏代码应该是把当前信道上每个消费者(当然,上⾯的例⼦中只有⼀个)的 prefetchCount 设为 1
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10, false); // Per consumer limit
channel.basicQos(15, true);  // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);
第37⾏代码将 autoAck 设为 false,向 Broker 发送 ack 响应的任务就交给开发⼈员了。
第34⾏代码在任务真正完成后,调⽤ basicAck ⽅法主动通知队列消息已成功消费。当队列收到 ack 确认后,会把下⼀条消息推送过来,并将该消息从队列中删除。
Qos ⽅案⽰意图如下:
gordon.study.rabbitmq.workqueue.Test03FairDispatch.java
public class Test03FairDispatch {
public static void main(String[] args) throws Exception {
QosAcknowledgeReceiver recv1 = new QosAcknowledgeReceiver("A", 200);
recv1.work();
QosAcknowledgeReceiver recv2 = new QosAcknowledgeReceiver("B", 800);
recv2.work();
Sender sender = new Sender("S");
sender.work();
java多线程入门}
}
运⾏Test03,可以看到 RabbitMQ 按照消费者的实际能⼒分配消息。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。