SpringbootRabbitmq使⽤Jackson2JsonMessageConver。。。Springboot为了应对⾼并发,接⼊了消息队列Rabbitmq,第⼀版验证时使⽤简单消费队列:
//发送端
AbstractOrder order =new Order();
//消费端
public void recieved(AbstractOrder order){
log.info("recieved order:"+order);
//处理逻辑
}
第⼆版为了应对可能出⾏的处理失败,使⽤了Rabbitmq的Ack
下⾯是最终版代码:
//发送端
//把订单加⼊队列
public void convertAndSendOrder(AbstractOrder order){
rabbitmqTemplate.setMandatory(true);
rabbitmqTemplate.setConfirmCallback(confirmCallback);
rabbitmqTemplate.setReturnCallback(returnCallback);
//全局唯⼀不然ReturnCallback ⽆效
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
}
//消费端逻辑
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "queue-order"),
exchange = @Exchange(value = "exchange-order"),
key = "rkey-order")},containerFactory="rabbitListenerContainerFactory")
public void recieved(Message messageorigin, Channel channel){
log.info("recieved message:"+messageorigin);
boolean success = false;
Order order =null;
try {
Jackson2JsonMessageConverter jackson2JsonMessageConverter =new Jackson2JsonMessageConverter();
order = (Order)jackson2JsonMessageConverter.fromMessage(messageorigin);
springframework和springboot}catch (Exception e){
log.info("get order fromBytes message exception"+e.getMessage());
}
//处理逻辑
//回调成功确认消息
if(success){
//成功确认消息
try{
channel.MessageProperties().getDeliveryTag(),false);
}catch (IOException e){
log.info("basicAck ex:"+e.getMessage());
try{
channel.MessageProperties().getDeliveryTag(),false);
}catch (IOException ee){
log.info("again basicAck ex:"+ee.getMessage());
}
}
}
注意:如果在received 中还像第⼀版直接转⾃定义对象,消息进程会报错
org.springframework.amqp.AmqpException: No method found for Order
解决⽅案是使⽤Jackson2JsonMessageConverter 。在发送消息时,它会先将⾃定义的消息类序列化成json格式,再转成byte构造 Message //发送设置Converter
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
//消费时指定Converter
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "queue-order"),
exchange = @Exchange(value = "exchange-order"),
key = "rkey-order")},containerFactory="rabbitListenerContainerFactory")
但是直接使⽤Jackson2JsonMessageConverter后,反序列化时要求发送的类和接受的类完全⼀样(字段,类名,包路径)。
查了下⽂档 docs.spring.io/spring-amqp/api/org/springframework/amqp/support/converter/Jackson2JsonMessageConverter.html 这⾥直接使⽤jackson2JsonMessageConverter.fromMessage的⽅法从Message拿出来。
ACK完成
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论