弹性:从错误和代理故障中恢复
Spring AMQP 提供的一些关键(也是最流行的)高级功能与恢复和自动重新连接有关,这些功能在协议错误或代理故障时会发生。我们已经在本指南中看到了所有相关的组件,但将它们全部放在一起并单独列出功能和恢复场景应该会有所帮助。
主要重连功能由CachingConnectionFactory
本身启用。使用RabbitAdmin
的自动声明功能通常也有益。此外,如果您关心保证交付,您可能还需要在RabbitTemplate
和SimpleMessageListenerContainer
中使用channelTransacted
标志,并在SimpleMessageListenerContainer
中使用AcknowledgeMode.AUTO
(如果您自己执行确认,则为手动)。
自动声明交换机、队列和绑定
RabbitAdmin
组件可以在启动时声明交换机、队列和绑定。它通过ConnectionListener
延迟地执行此操作。因此,如果代理在启动时不存在,则无关紧要。第一次使用Connection
(例如,通过发送消息)时,监听器会触发,并应用管理功能。在监听器中执行自动声明的另一个好处是,如果连接因任何原因断开(例如,代理死亡、网络故障等),则在重新建立连接时会再次应用它们。
以这种方式声明的队列必须具有固定名称——要么显式声明,要么由框架为AnonymousQueue 实例生成。匿名队列是非持久的、排他的,并且自动删除。
|
仅当CachingConnectionFactory 缓存模式为CHANNEL (默认值)时,才会执行自动声明。此限制存在是因为排他性和自动删除队列绑定到连接。
|
从 2.2.2 版本开始,RabbitAdmin
将检测类型为DeclarableCustomizer
的 Bean,并在实际处理声明之前应用该函数。例如,这对于在框架中具有首选支持之前设置新参数(属性)很有用。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
它在不提供对Declarable
Bean 定义的直接访问的项目中也很有用。
另请参阅 RabbitMQ 自动连接/拓扑恢复。
同步操作中的故障和重试选项
如果您在使用RabbitTemplate
时在同步序列中丢失了与代理的连接(例如),Spring AMQP 会抛出AmqpException
(通常,但并非总是AmqpIOException
)。我们不会试图隐藏存在问题的事实,因此您必须能够捕获并响应异常。如果您怀疑连接已丢失(并且不是您的错),最简单的方法是再次尝试操作。您可以手动执行此操作,或者您可以考虑使用 Spring Retry 来处理重试(命令式或声明式)。
Spring Retry 提供了一些 AOP 拦截器,以及高度灵活的配置选项,用于指定重试参数(尝试次数、异常类型、回退算法等)。Spring AMQP 还提供了一些便捷的工厂 Bean,用于以适合 AMQP 使用场景的便捷形式创建 Spring Retry 拦截器,并提供强类型回调接口,可用于实现自定义恢复逻辑。有关更多详细信息,请参阅 StatefulRetryOperationsInterceptor
和 StatelessRetryOperationsInterceptor
的 Javadoc 和属性。如果不存在事务或在重试回调中启动事务,则无状态重试是合适的。请注意,无状态重试比有状态重试配置和分析更简单,但如果存在必须回滚或肯定会回滚的正在进行的事务,则通常不适合。在事务中间断开连接应与回滚具有相同的效果。因此,对于在堆栈中更高位置启动事务的重新连接,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一标识消息。最简单的方法是让发送方在 MessageId
消息属性中放置一个唯一值。提供的消息转换器提供了一个执行此操作的选项:您可以将 createMessageIds
设置为 true
。否则,您可以将 MessageKeyGenerator
实现注入拦截器。密钥生成器必须为每条消息返回一个唯一的密钥。在 2.0 版之前的版本中,提供了 MissingMessageIdAdvice
。它允许没有 messageId
属性的消息重试一次(忽略重试设置)。此建议不再提供,因为随着 spring-retry
1.2 版的发布,其功能已内置到拦截器和消息监听器容器中。
为了向后兼容,默认情况下,具有空消息 ID 的消息被视为对消费者的致命错误(消费者停止)(重试一次后)。要复制 MissingMessageIdAdvice 提供的功能,您可以在监听器容器上将 statefulRetryFatalWithNullMessageId 属性设置为 false 。使用此设置,消费者将继续运行,并且消息将被拒绝(重试一次后)。它将被丢弃或路由到死信队列(如果已配置)。
|
从 1.3 版开始,提供了一个构建器 API,以帮助使用 Java(在 @Configuration
类中)组装这些拦截器。以下示例展示了如何执行此操作
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只能以这种方式配置重试功能的子集。更高级的功能需要将 RetryTemplate
配置为 Spring Bean。有关可用策略及其配置的完整信息,请参阅 Spring Retry Javadoc。
使用批处理监听器进行重试
除非批次是由生产者在单个记录中创建的,否则不建议为批次监听器配置重试。有关消费者和生产者创建的批次的更多信息,请参见批次消息。对于消费者创建的批次,框架不知道批次中哪个消息导致了失败,因此在重试耗尽后无法恢复。对于生产者创建的批次,由于只有一个消息实际失败,因此可以恢复整个消息。应用程序可能希望告知自定义恢复器批次中失败的位置,例如通过设置抛出异常的索引属性。
批次监听器的重试恢复器必须实现MessageBatchRecoverer
。
消息监听器和异步情况
如果MessageListener
由于业务异常而失败,则异常将由消息监听器容器处理,然后容器将继续监听另一个消息。如果失败是由连接断开(而不是业务异常)引起的,则收集消息以供监听器使用的消费者必须被取消并重新启动。SimpleMessageListenerContainer
无缝地处理此问题,并留下日志以说明监听器正在重新启动。实际上,它会无限循环,尝试重新启动消费者。只有当消费者行为非常糟糕时,它才会放弃。一个副作用是,如果容器启动时代理已关闭,它会一直尝试,直到建立连接。
与协议错误和连接断开相比,业务异常处理可能需要更多思考和一些自定义配置,尤其是在使用事务或容器确认的情况下。在 2.8.x 之前,RabbitMQ 没有定义死信行为。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无限期地重新传递。为了限制客户端的重新传递次数,一种选择是在监听器的建议链中使用StatefulRetryOperationsInterceptor
。拦截器可以具有一个恢复回调,该回调实现自定义死信操作——无论对您的特定环境最合适。
另一种选择是将容器的defaultRequeueRejected
属性设置为false
。这会导致所有失败的消息被丢弃。当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传递到死信交换机。
或者,您可以抛出AmqpRejectAndDontRequeueException
。这样做可以防止消息重新排队,无论defaultRequeueRejected
属性的设置如何。
从 2.1 版本开始,引入了 ImmediateRequeueAmqpException
来执行完全相反的逻辑:无论 defaultRequeueRejected
属性的设置如何,消息都将被重新排队。
通常,两种技术都会结合使用。您可以在建议链中使用 StatefulRetryOperationsInterceptor
,并使用一个抛出 AmqpRejectAndDontRequeueException
的 MessageRecoverer
。当所有重试都已耗尽时,将调用 MessageRecover
。RejectAndDontRequeueRecoverer
正如其名,会拒绝消息并不会重新排队。默认的 MessageRecoverer
会消费错误消息并发出 WARN
消息。
从 1.3 版本开始,提供了一个新的 RepublishMessageRecoverer
,允许在重试耗尽后发布失败的消息。
当恢复器消费最终异常时,消息将被确认,并且如果配置了,不会被发送到代理的死信交换机。
当在消费者端使用 RepublishMessageRecoverer 时,接收到的消息在 receivedDeliveryMode 消息属性中具有 deliveryMode 。在这种情况下,deliveryMode 为 null 。这意味着代理上的 NON_PERSISTENT 传递模式。从 2.0 版本开始,您可以为 RepublishMessageRecoverer 配置 deliveryMode ,以便在 null 时将其设置为要重新发布的消息。默认情况下,它使用 MessageProperties 的默认值 - MessageDeliveryMode.PERSISTENT 。
|
以下示例展示了如何将 RepublishMessageRecoverer
设置为恢复器
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer
会将消息发布到消息头中,其中包含其他信息,例如异常消息、堆栈跟踪、原始交换机和路由键。可以通过创建子类并覆盖 additionalHeaders()
来添加其他头。deliveryMode
(或任何其他属性)也可以在 additionalHeaders()
中更改,如下例所示
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从 2.0.5 版本开始,如果堆栈跟踪过大,则可能会被截断;这是因为所有头都必须放在一个帧中。默认情况下,如果堆栈跟踪会导致少于 20,000 字节(“余量”)可用于其他头,则它将被截断。如果您需要更多或更少的空间用于其他头,可以通过设置恢复器的 frameMaxHeadroom
属性来调整。从 2.1.13、2.2.3 版本开始,异常消息包含在此计算中,并且将使用以下算法最大化堆栈跟踪量
-
如果仅堆栈跟踪本身就超过限制,则异常消息头将被截断为 97 字节加上
…
,并且堆栈跟踪也被截断。 -
如果堆栈跟踪很小,消息将被截断(加上
…
)以适应可用字节(但堆栈跟踪本身的消息将被截断为 97 字节加上…
)。
无论何时发生任何类型的截断,原始异常都将被记录以保留完整的信息。评估在增强标题后执行,以便可以使用诸如异常类型之类的信息在表达式中。
从版本 2.4.8 开始,错误交换和路由键可以作为 SpEL 表达式提供,其中Message
是评估的根对象。
从版本 2.3.3 开始,提供了一个新的子类RepublishMessageRecovererWithConfirms
;它支持两种发布确认样式,并将等待确认后返回(或者如果未确认或消息被返回则抛出异常)。
如果确认类型为CORRELATED
,子类还将检测消息是否被返回并抛出AmqpMessageReturnedException
;如果发布被否定确认,它将抛出AmqpNackReceivedException
。
如果确认类型为SIMPLE
,子类将调用通道上的waitForConfirmsOrDie
方法。
有关确认和返回的更多信息,请参见发布者确认和返回。
从版本 2.1 开始,添加了ImmediateRequeueMessageRecoverer
以抛出ImmediateRequeueAmqpException
,它通知侦听器容器重新排队当前失败的消息。
Spring Retry 的异常分类
Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。默认配置对所有异常进行重试。鉴于用户异常被包装在ListenerExecutionFailedException
中,我们需要确保分类检查异常原因。默认分类器只查看顶级异常。
从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier
具有一个名为traverseCauses
(默认值:false
)的属性。当为true
时,它会遍历异常原因,直到找到匹配项或没有原因。
要将此分类器用于重试,可以使用使用构造函数创建的SimpleRetryPolicy
,该构造函数接受最大尝试次数、Exception
实例的Map
和布尔值(traverseCauses
),并将此策略注入RetryTemplate
中。