springboot集成rabbitMQ
发消息
1 2 3 4 5 6 7 8 9
| @Resource private RabbitTemplate rabbitTemplate;
@Value("${my.exchange}") private String exchange;
public void send(Object obj) { rabbitTemplate.convertAndSend(exchange, "", JsonUtils.toJson(obj)); }
|
处理消息
1 2 3 4
| @RabbitListener(queues = "${msg.queue.my.exchange}") public void receive(String msg) { }
|
默认情况下,如果处理消息时抛出了异常,这个消息会一直重复消费(重复调用receive方法),直到没有抛异常。
消息接收/处理流程分析
在@RabbitListener注解的方法里打个断点,观察下调用栈。
末端是个实现了Runnable的内部类SimpleMessageListenerContainer$AsyncMessageProcessingConsumer
1 2 3 4 5 6 7 8 9 10 11 12
| @Override public void run() { while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) { try { boolean receivedOk = receiveAndExecute(this.consumer); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { Channel channel = consumer.getChannel(); for (int i = 0; i < this.txSize; i++) { logger.trace("Waiting for message from consumer."); Message message = consumer.nextMessage(this.receiveTimeout); if (message == null) {break;} try { executeListener(channel, message); } catch (ImmediateAcknowledgeAmqpException e) {} catch (Throwable ex) { if (causeChainHasImmediateAcknowledgeAmqpException(ex)) {} if (this.transactionManager != null) {} else { consumer.rollbackOnExceptionIfNecessary(ex); throw ex; } } } return consumer.commitIfNecessary(isChannelLocallyTransacted(channel));
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception { boolean ackRequired = !this.acknowledgeMode.isAutoAck() && !this.acknowledgeMode.isManual(); try { if (this.transactional) {} if (ackRequired) { boolean shouldRequeue = RabbitUtils.shouldRequeue(this.defaultRequeuRejected, ex, logger); for (Long deliveryTag : this.deliveryTags) { this.channel.basicReject(deliveryTag, shouldRequeue); } if (this.transactional) {} } } catch (Exception e) { logger.error("Application exception overridden by rollback exception", ex); throw e; } finally { this.deliveryTags.clear(); } }
|
由上可看出,导致无限重试的两个值:
- ackRequired: 和ack模式相关
- shouldRequeue: 和defaultRequeuRejected以及抛出的异常类型有关
根据需要修改acknowledgeMode或者defaultRequeuRejected即可,在配置文件中根据IDE补全提示可以找到这两个属性。
1 2
| spring.rabbitmq.listener.acknowledge-mode=none spring.rabbitmq.listener.default-requeue-rejected=false
|
如果只是不想异常时重试,直接在业务代码中try…catch全部代码不让抛异常也行。
PS:验证这两个配置的时候发现不生效,一番debug发现项目里面自定义了一个工厂bean,创建消费者相关对象时用的那个工厂bean,而不是根据配置属性生成。
1 2 3 4 5 6 7
| @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; }
|
注释掉这个bean那两个配置才生效,当然也可以在这个bean里设置那两个属性。