JavaSpringBoot集成RabbitMq实战和总结
⽬录
交换器、队列、绑定的声明
关于消息序列化
同⼀个队列多消费类型
注解将消息和消息头注⼊消费者⽅法
关于消费者确认
关于发送者确认模式
消费消息、死信队列和RetryTemplate
RPC模式的消息(不常⽤)
关于消费模型
关于RabbitMq客户端的线程模型
在公司⾥⼀直在⽤RabbitMQ,由于api已经封装的很简单,关于RabbitMQ本⾝还有封装的实现没有了解,最近在看RabbitMQ实战这本书,结合⽹上的⼀些例⼦和spring⽂档,实现了RabbitMQ和spring的集成,对着⾃⼰平时的疑惑做了⼀些总结。
关于RabbitMQ基础不在详细讲解(本⽂不适合RabbitMq零基础),RabbitMQ实战的1,2,4三章讲的⾮常不错。因为书中讲的都是Python和Php 的例⼦,所以⾃⼰结合SpringBoot⽂档和朱⼩厮的博客做了⼀些总结,写了⼀些Springboot的例⼦。
交换器、队列、绑定的声明
SpringAMQP项⽬对RabbitMQ做了很好的封装,可以很⽅便的⼿动声明队列,交换器,绑定。如下:
/** * 队列 *@return*/@Bean@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE)Queuequeue()
{returnnewQueue(RabbitMQConstant.PROGRAMMATICALLY_QUEUE,false,false,true); }/** * 交换器
*@return*/@Bean@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE)TopicExchangeexchange() {returnnewTopicExchange(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE,false,true); }/** * 声明绑定关系
*@return*/@BeanBindingbinding(@Qualifier(RabbitMQConstant.PROGRAMMATICALLY_EXCHANGE)TopicExchange exchange, @Qualifier(RabbitMQConstant.PROGRAMMATICALLY_QUEUE)Queue queue)
{returnBindingBuilder.bind(queue).to(exchange).with(RabbitMQConstant.PROGRAMMATICALLY_KEY); }/** * 声明简单的消费者,接收到的都是原始的{@linkMessage} * *@paramconnectionFactory *
*@return*/@BeanSimpleMessageListenerContainersimpleContainer(ConnectionFactory connectionFactory){ SimpleMessageListenerContainer container =newSimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory); container.setMessageListener(message -> log.info("simple
receiver,message:{}", message));
container.setQueueNames(RabbitMQConstant.PROGRAMMATICALLY_QUEUE);returncontainer; }
消费者和⽣产者都可以声明,交换器这种⼀般经常创建,可以⼿动创建。需要注意对于没有路由到队列的消息会被丢弃。
如果是Spring的话还需要声明连接:
@BeanConnectionFactoryconnectionFactory(@Value("${spring.rabbitmq.port}")intport,
@Value("${spring.rabbitmq.host}")String host, @Value("${spring.rabbitmq.username}")String userName, @Value("${spring.rabbitmq.password}")String password,
@Value("${spring.rabbitmq.publisher-confirms}")booleanisConfirm, @Value("${spring.rabbitmq.virtual-
host}")String vhost){ CachingConnectionFactory connectionFactory =newCachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setVirtualHost(vhost); connectionFactory.setPort(port); connectionFactory.
setUsername(userName); connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(isConfirm); }
在配置类使⽤@EnableRabbit的情况下,也可以基于注解进⾏声明,在Bean的⽅法上加上@RabbitListener,如下:
/** * 可以直接通过注解声明交换器、绑定、队列。但是如果声明的和rabbitMq中已经存在的不⼀致的话 * 会报错便于测试,我这⾥都是不使⽤持久化,没有消费者之后⾃动删除 * {@linkRabbitListener}是可以重复的。并且声明队列绑定的key也可以有多个. *
*@paramheaders *@parammsg */@RabbitListener( bindings =@QueueBinding( exchange =@Exchange(value = RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC, durable =
RabbitMQConstant.FALSE_CONSTANT, autoDelete = ue_CONSTANT), value =@Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = ue_CONSTANT), key = DKEY ),//⼿动指明消费者的监听容器,默认
Spring为⾃动⽣成⼀个SimpleMessageListenerContainercontainerFactory ="container",//指定消费者的线程数量,⼀个线程会打开⼀个Channel,⼀个队列上的消息只会被消费⼀次(不考虑消息重新⼊队列的情况),下⾯的表⽰⾄少开启5个线程,最多10个。线程的数⽬需要根据你的任务来决定,如果是计算密集型,线程的数⽬就应该少⼀些concurrency ="5-10")publicvoidprocess(@Headers Map<String, Object> headers, @Payload ExampleEvent msg){ log.info("basic consumer receive message:{headers = ["+ headers +"], msg = ["+ msg +"]}"); }/** * {@linkQueue#ignoreDeclarationExceptions}声明队列会忽略错误不声明队列,这个消费者仍然是可⽤的 * *@paramheaders
*@parammsg */@RabbitListener(queuesToDeclare =@Queue(value = RabbitMQConstant.DEFAULT_QUEUE, ignoreDeclarationExceptions = ue_CONSTANT))publicvoidprocess2(@Headers Map<String, Object> headers, @Payload ExampleEvent msg){ log.info("basic2 consumer receive message:{headers = ["+ headers +"], msg = ["+ msg +"]}"); }
关于消息序列化
这个⽐较简单,默认采⽤了Java序列化,我们⼀般使⽤的Json格式,所以配置了Jackson,根据⾃⼰的情况来,直接贴代码:
@BeanMessageConvertermessageConverter(){returnnewJackson2JsonMessageConverter(); }
同⼀个队列多消费类型
如果是同⼀个队列多个消费类型那么就需要针对每种类型提供⼀个消费⽅法,否则不到匹配的⽅法会报错,如下:
@Component@Slf4j@RabbitListener( bindings =@QueueBinding( exchange =@Exchange(value =
RabbitMQConstant.MULTIPART_HANDLE_EXCHANGE, type = ExchangeTypes.TOPIC, durable =
RabbitMQConstant.FALSE_CONSTANT, autoDelete = ue_CONSTANT), value =@Queue(value = RabbitMQConstant.MULTIPART_HANDLE_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = ue_CONSTANT), key = RabbitMQConstant.MULTIPART_HANDLE_KEY
))@Profile(SpringConstant.MULTIPART_PROFILE)publicclassMultipartConsumer{/** * RabbitHandler⽤于有多个⽅法时但是参数类型不能⼀样,否则会报错 * *@parammsg */@Rabbi
tHandlerpublicvoidprocess(ExampleEvent msg){ log.info("param:{msg = ["+ msg +"]} info:"); }@RabbitHandlerpublicvoidprocessMessage2(ExampleEvent2 msg){ log.info("param:{msg2 = ["+ msg +"]} info:"); }/** * 下⾯的多个消费者,消费的类型不⼀样没事,不会被调⽤,但是如果缺了相应消息的处理Handler则会报错 *
*@parammsg */@RabbitHandlerpublicvoidprocessMessage3(ExampleEvent3 msg){ log.info("param:{msg3 = ["+ msg +"]} info:"); }}
注解将消息和消息头注⼊消费者⽅法
在上⾯也看到了@Payload等注解⽤于注⼊消息。这些注解有:
@Header 注⼊消息头的单个属性
@Payload 注⼊消息体到⼀个JavaBean中
@Headers 注⼊所有消息头到⼀个Map中
这⾥有⼀点主要注意,如果是com.rabbitmq.client.Channel,org.Message和
ssaging.Message这些类型,可以不加注解,直接可以注⼊。
如果不是这些类型,那么不加注解的参数将会被当做消息体。不能多于⼀个消息体。如下⽅法ExampleEvent就是默认的消息体:
publicvoidprocess2(@Headers Map<String, Object> headers,ExampleEvent msg);
关于消费者确认
RabbitMq消费者可以选择⼿动和⾃动确认两种模式,如果是⾃动,消息已到达队列,RabbitMq对⽆脑的将消息抛给消费者,⼀旦发送成功,他会认为消费者已经成功接收,在RabbitMq内部就把消息给删除了。另外⼀种就是⼿动模式,⼿动模式需要消费者对每条消息进⾏确认(也可以批量确认),RabbitMq发送完消息之后,会进⼊到⼀个待确认(unacked)的队列,如下图红框部分:
如果消费者发送了ack,RabbitMq将会把这条消息从待确认中删除。如果是nack并且指明不要重新⼊队列,那么该消息也会删除。但是如果是nack且指明了重新⼊队列那么这条消息将会⼊队列,然后重新发送给消费者,被重新投递的消息消息头amqp_redelivered属性会被设置成true,客户端可以依靠这点来判断消息是否被确认,可以好好利⽤这⼀点,如果每次都重新回队列会导致同⼀消息不停的被发送和拒绝。消费者在确认消息之前和RabbitMq失去了连接那么消息也会被重新投递。所以⼿动确认模式很⼤程度上提⾼可靠性。⾃动模式的消息可以提⾼吞吐量。
spring⼿动确认消息需要将SimpleRabbitListenerContainerFactory设置为⼿动模式:
simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
⼿动确认的消费者代码如下:
@SneakyThrows@RabbitListener(bindings =@QueueBinding( exchange =@Exchange(value =
RabbitMQConstant.CONFIRM_EXCHANGE, type = ExchangeTypes.TOPIC, durable =
RabbitMQConstant.FALSE_CONSTANT, autoDelete = ue_CONSTANT), value =@Queue(value = RabbitMQConstant.CONFIRM_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = ue_CONSTANT),
key = RabbitMQConstant.CONFIRM_KEY), containerFactory ="containerWithConfirm")publicvoidprocess(ExampleEvent msg, Channel channel, @Header(name
="amqp_deliveryTag")longdeliveryTag, @Header("amqp_redelivered")booleanredelivered, @Headers Map head) {try{ log.info("ConsumerWithConfirm receive message:{},header:{}", msg, head);
channel.basicAck(deliveryTag,false); }catch(Exception e) { ("consume confirm error!", e);//这⼀步千万不要忘记,不会会导致消息未确认,消息到达连接的qos之后便不能再接收新消息//⼀般重试肯定的有次数,这⾥简单的根据是否已经重发过来来决定重发。第⼆个参数表⽰是否重新分发channel.basicReject(deliveryTag, !redelivered);//这个⽅法我知道的是⽐上⾯多⼀个批量确认的参数// channel.basicNack(deliveryTag, false,!redelivered);} }
关于spring的AcknowledgeMode需要说明,他⼀共有三种模式:NONE,MANUAL,AUTO,默认是AUTO模式。这⽐RabbitMq原⽣多了⼀种。这⼀点很容易混淆,这⾥的NONE对应其实就是RabbitMq的⾃动确认,MANUAL是⼿动。⽽AUTO其实也是⼿动模式,只不过是Spring的⼀层封装,他根据你⽅法执⾏的结果⾃动帮你发送ack和nack。如果⽅法未抛出异常,则发送ack。如果⽅法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新⼊队列。如果抛出异常时Amqp
RejectAndDontRequeueException则发送nack不会重新⼊队列。我有⼀个例⼦专门测试NONE,见CunsumerWithNoneTest。
还有⼀点需要注意的是消费者有⼀个参数prefetch,它表⽰的是⼀个Channel(也就是SimpleMessageListenerContainer的⼀个线程)预取的消息数量,这个参数只会在⼿动确认的消费者才⽣效。可以客户端利⽤这个参数来提⾼性能和做流量控制。如果prefetch设置的是10,当这个Channel上unacked的消息数量到达10条时,RabbitMq便不会在向你发送消息,客户端如果处理的慢,便可以延迟确认在⽅法消息的接收。⾄于提⾼性能就⾮常容易理解,因为这个是批量获取消息,如果客户端处理的很快便不⽤⼀个⼀个去等着去新的消息。SpringAMQP2.0开始默认是250,这个参数应该已经⾜够了。注意之前的版本默认值是1所以有必要重新设置⼀下值。当然这个值也不能设置的太⼤,RabbitMq是通过round robin这个策略来做负载均衡的,如果设置的太⼤会导致消息不多时⼀下⼦积压到⼀台消费者,不能很好的均衡负载。另外如果消息数据量很⼤也应该适当减⼩这个值,这个值过⼤会导致客户端内存占⽤问题。如果你⽤到了事务的话也需要考虑这个值的影响,因为事务的⽤处不⼤,所以我也没做过多的深究。
关于发送者确认模式
考虑这样⼀个场景:你发送了⼀个消息给RabbitMq,RabbitMq接收了但是存⼊磁盘之前服务器就挂了,
消息也就丢了。为了保证消息的投递有两种解决⽅案,最保险的就是事务(和DB的事务没有太⼤的可⽐性), 但是因为事务会极⼤的降低性能,会导致⽣产者和RabbitMq之间产⽣同步(等待确认),这也违背了我们使⽤RabbitMq的初衷。所以⼀般很少采⽤,这就引⼊第⼆种⽅案:发送者确认模式。
发送者确认模式是指发送⽅发送的消息都带有⼀个id,RabbitMq会将消息持久化到磁盘之后通知⽣产者消息已经成功投递,如果因为RabbitMq 内部的错误会发送ack。注意这⾥的发送者和RabbitMq之间是异步的,所以相较于事务机制性能⼤⼤提⾼。其实很多操作都是不能保证绝对的百分之⼀百的成功,哪怕采⽤了事务也是如此,可靠性和性能很多时候需要做⼀些取舍,想很多互联⽹公司吹嘘的5个9,6个9也是⼀样的道理。如果不是重要的消息性能计数器,完全可以不采⽤发送者确认模式。
这⾥有⼀点我当时纠结了很久,我⼀直以为发送者确认模式的回调是客户端的ack触发的,这⾥是⼤⼤的误解!发送者确认模式和消费者没有⼀点关系,消费者确认也和发送者没有⼀点关系,两者都是在和RabbitMq打交道,发送者不会管消费者有没有收到,只要消息到了RabbitMq并且已经持久化便会通知⽣产者,这个ack是RabbitMq本⾝发出的,和消费者⽆关
发送者确认模式需要将Channel设置成Confirm模式,这样才会收到通知。Spring中需要将连接设置成Confirm模式:
connectionFactory.setPublisherConfirms(isConfirm);
然后在RabbitTemplate中设置确认的回调,correlationData是消息的id,如下(只是简单打印下):
// 设置RabbitTemplate每次发送消息都会回调这个⽅法rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> log.info("confirm callback id:{},ack:{},cause:{}", correlationData, ack, cause));
发送时需要给出唯⼀的标识(CorrelationData):
RabbitMQConstant.DEFAULT_KEY,newExampleEvent(i,"confirm message id:"+ i),String(i)));
还有⼀个参数需要说下:mandatory。这个参数为true表⽰如果发送消息到了RabbitMq,没有对应该消息的队列。那么会将消息返回给⽣产者,此时仍然会发送ack确认消息。
设置RabbitTemplate的回调如下:
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info("return callback message:{},code:{},text:{}", message, replyCode, replyText));
另外如果是RabbitMq内部的错误,不会调⽤该⽅法。所以如果消息特别重要,对于未确认的消息,⽣产者应该在内存⽤保存着,在确认时候根据返回的id删除该消息。如果是nack可以将该消息记录专门的⽇志或者转发到相应处理的逻辑进⾏后续补偿。RabbitTemplate也可以配置RetryTemplate,发送失败时直接进⾏重试,具体还是要结合业务。
最后关于发送者确认需要提的是spring,因为spring默认的Bean是单例的,所以针对不同的确认⽅案(其实有不同的确认⽅案是⽐较合理的,很多消息不需要确认,有些需要确认)需要配置不同的bean.
消费消息、死信队列和RetryTemplate
上⾯也提到了如果消费者抛出异常时默认的处理逻辑。另外我们还可以给消费者配置RetryTemplate,如果是采⽤SpringBoot的话,可以在l配置中配置如下:
spring: rabbitmq: listener: retry:# 重试次数 max-attempts:3# 开启重试机制 enabled:true
如上,如果消费者失败的话会进⾏重试,默认是3次。注意这⾥的重试机制RabbitMq是为感知的!到达3次之后会抛出异常调⽤MessageRecoverer。默认的实现为RejectAndDontRequeueRecoverer,也就是打印异常,发送nack,不会重新⼊队列。
我想既然配置了重试机制消息肯定是很重要的,消息肯定不能丢,仅仅是⽇志可能会因为⽇志滚动丢失⽽且信息不明显,所以我们要讲消息保存下来。可以有如下这些⽅案:
使⽤RepublishMessageRecoverer这个MessageRecoverer会发送发送消息到指定队列
给队列绑定死信队列,因为默认的RepublishMessageRecoverer会发送nack并且requeue为false。这样抛出⼀场是这种⽅式和上⾯的结果⼀样都是转发到了另外⼀个队列。详见DeadLetterConsumer
注册⾃⼰实现的MessageRecoverer
给MessageListenerContainer设置RecoveryCallback
对于⽅法⼿动捕获异常,进⾏处理
我⽐较推荐前两种。这⾥说下死信队列,死信队列其实就是普通的队列,只不过⼀个队列声明的时候指定的属性,会将死信转发到该交换器中。声明死信队列⽅法如下:
@RabbitListener( bindings =@QueueBinding( exchange =@Exchange(value =
RabbitMQConstant.DEFAULT_EXCHANGE, type = ExchangeTypes.TOPIC, durable =java调用python模型
RabbitMQConstant.FALSE_CONSTANT, autoDelete = ue_CONSTANT), value =@Queue(value = RabbitMQConstant.DEFAULT_QUEUE, durable = RabbitMQConstant.FALSE_CONSTANT, autoDelete = ue_CONSTANT, arguments = {@Argument(name = RabbitMQConstant.DEAD_LETTER_EXCHANGE, value = RabbitMQConstant.DEAD_EXCHANGE),@Argument(name = RabbitMQConstant.DEAD_LETTER_KEY, value = RabbitMQConstant.DEAD_KEY) }), key = RabbitMQConstant.DEFAULT_KEY ))
其实也就只是在声明的时候多加了两个参数x-dead-letter-exchange和x-dead-letter-routing-key。这⾥⼀开始踩了⼀个坑,因为
@QueueBinding注解中也有arguments属性,我⼀开始将参数声明到@QueueBinding中,导致⼀直没绑定成功。如果绑定成功可以在控制台看到queue的Featrues有DLX(死信队列交换器)和DLK(死信队列绑定)。如下:
关于消息进⼊死信的规则:
消息被拒绝(ject/basic.nack)并且requeue=false
消息TTL过期
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论