弹性:从错误和代理故障中恢复

Spring AMQP 提供的一些关键(也是最受欢迎的)高级功能与恢复和自动重新连接相关,这些功能会在协议错误或代理故障时触发。在本指南中,我们已经了解了所有相关的组件,但在这里将它们整合在一起,并单独列出这些功能和恢复场景,应该会有所帮助。

主要的重新连接功能由 CachingConnectionFactory 本身启用。使用 RabbitAdmin 的自动声明功能通常也很有益。此外,如果您关心保证投递,则可能还需要在 RabbitTemplateSimpleMessageListenerContainer 中使用 channelTransacted 标志,并在 SimpleMessageListenerContainer 中使用 AcknowledgeMode.AUTO(或者如果您自己执行确认,则使用手动确认)。

交换机、队列和绑定的自动声明

RabbitAdmin 组件可以在启动时声明交换机、队列和绑定。它通过 ConnectionListener 以延迟的方式执行此操作。因此,如果启动时代理不存在,则无关紧要。第一次使用 Connection(例如,通过发送消息)时,监听器会触发,并应用 admin 功能。在监听器中执行自动声明的另一个好处是,如果连接因任何原因断开(例如,代理死亡、网络故障等),则在重新建立连接时会再次应用它们。

以这种方式声明的队列必须具有固定的名称——要么显式声明,要么由框架为 AnonymousQueue 实例生成。匿名队列是非持久化的、独占的和自动删除的。
仅当 CachingConnectionFactory 的缓存模式为 CHANNEL(默认值)时,才会执行自动声明。此限制存在的原因是独占和自动删除的队列绑定到连接。

从 2.2.2 版本开始,RabbitAdmin 将检测类型为 DeclarableCustomizer 的 bean,并在实际处理声明之前应用该函数。例如,这对于在框架中获得一等支持之前设置新参数(属性)非常有用。

@Bean
public DeclarableCustomizer customizer() {
    return dec -> {
        if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
            dec.addArgument("some.new.queue.argument", true);
        }
        return dec;
    };
}

它在不提供对 Declarable bean 定义的直接访问的项目中也很有用。

同步操作中的故障和重试选项

如果您在使用 RabbitTemplate 时在同步序列中丢失了与代理的连接(例如),Spring AMQP 将抛出 AmqpException(通常是 AmqpIOException,但并非总是如此)。我们不试图隐藏存在问题的事实,因此您必须能够捕获并响应异常。如果您怀疑连接已断开(并且这不是您的错误),最简单的方法是再次尝试该操作。您可以手动执行此操作,或者您可以考虑使用 Spring Retry 来处理重试(命令式或声明式)。

Spring Retry 提供了一些 AOP 拦截器,并提供了极大的灵活性来指定重试的参数(尝试次数、异常类型、回退算法等)。Spring AMQP 还提供了一些便捷的工厂 Bean,用于以方便的形式为 AMQP 使用场景创建 Spring Retry 拦截器,并提供了强类型回调接口,您可以使用这些接口来实现自定义恢复逻辑。有关更多详细信息,请参阅 StatefulRetryOperationsInterceptorStatelessRetryOperationsInterceptor 的 Javadoc 和属性。如果不存在事务或在重试回调内部启动了事务,则无状态重试是合适的。请注意,无状态重试比有状态重试更易于配置和分析,但如果存在必须回滚或肯定要回滚的正在进行的事务,则通常不适合使用。在事务中间断开的连接应与回滚具有相同的效果。因此,对于在堆栈较高层启动事务的重新连接,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一标识消息。最简单的方法是让发送方在 MessageId 消息属性中放入唯一值。提供的消息转换器提供了执行此操作的选项:您可以将 createMessageIds 设置为 true。否则,您可以将 MessageKeyGenerator 实现注入拦截器。密钥生成器必须为每条消息返回一个唯一的密钥。在 2.0 版之前的版本中,提供了 MissingMessageIdAdvice。它允许没有 messageId 属性的消息精确重试一次(忽略重试设置)。此建议不再提供,因为随着 spring-retry 1.2 版,其功能已内置到拦截器和消息侦听器容器中。

为了向后兼容,默认情况下,具有空消息 ID 的消息被视为对消费者致命(消费者停止)(重试一次后)。要复制 MissingMessageIdAdvice 提供的功能,您可以在侦听器容器上将 statefulRetryFatalWithNullMessageId 属性设置为 false。使用此设置,消费者将继续运行,并且消息将被拒绝(重试一次后)。它将被丢弃或路由到死信队列(如果配置了)。

从 1.3 版开始,提供了一个构建器 API,以帮助使用 Java(在 @Configuration 类中)组装这些拦截器。以下示例显示了如何执行此操作

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxAttempts(5)
            .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
            .build();
}

只能以这种方式配置一部分重试功能。更高级的功能需要将 RetryTemplate 配置为 Spring Bean。有关可用策略及其配置的完整信息,请参阅 Spring Retry Javadoc

批量侦听器的重试

不建议将重试与批量侦听器一起配置,除非批量是由生产者在单个记录中创建的。有关消费者和生产者创建的批量的更多信息,请参阅 批量消息。对于消费者创建的批次,框架不知道批次中的哪条消息导致了故障,因此在重试耗尽后无法恢复。对于生产者创建的批次,由于实际上只有一条消息失败,因此可以恢复整个消息。应用程序可能希望通知自定义恢复程序故障发生在批次中的哪个位置,也许可以通过设置抛出异常的索引属性。

批量侦听器的重试恢复程序必须实现 MessageBatchRecoverer

消息侦听器和异步情况

如果 MessageListener 由于业务异常而失败,则异常将由消息侦听器容器处理,然后容器返回以侦听另一条消息。如果故障是由连接断开(不是业务异常)引起的,则必须取消并重新启动为侦听器收集消息的消费者。SimpleMessageListenerContainer 无缝处理此问题,并且会留下日志以说明侦听器正在重新启动。实际上,它会无限循环,尝试重新启动使用者。只有当使用者行为非常糟糕时,它才会放弃。一个副作用是,如果在容器启动时代理已关闭,它会一直尝试,直到建立连接。

与协议错误和连接断开相比,业务异常处理可能需要更多考虑和一些自定义配置,尤其是在使用事务或容器确认时。在 2.8.x 之前,RabbitMQ 没有定义死信行为。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可以无限期地重新传递。为了限制客户端上的重新传递次数,一种选择是在侦听器的建议链中使用 StatefulRetryOperationsInterceptor。拦截器可以具有一个恢复回调,该回调实现自定义死信操作——无论对您的特定环境最合适。

另一种方法是将容器的 defaultRequeueRejected 属性设置为 false。这会导致所有失败的消息都被丢弃。当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传递到死信交换机。

或者,您可以抛出 AmqpRejectAndDontRequeueException。这样做可以防止消息重新入队,而不管 defaultRequeueRejected 属性的设置如何。

从 2.1 版开始,引入了 ImmediateRequeueAmqpException 以执行完全相反的逻辑:消息将重新入队,而不管 defaultRequeueRejected 属性的设置如何。

通常,两种技术都结合使用。您可以在建议链中使用 StatefulRetryOperationsInterceptor 和一个抛出 AmqpRejectAndDontRequeueExceptionMessageRecoverer。当所有重试都耗尽时,将调用 MessageRecoverRejectAndDontRequeueRecoverer 正是这样做的。默认的 MessageRecoverer 使用错误消息并发出 WARN 消息。

从 1.3 版开始,提供了一个新的 RepublishMessageRecoverer,以允许在重试耗尽后发布失败的消息。

当恢复程序使用最终异常时,消息将被确认,并且如果已配置,则不会由代理发送到死信交换机。

当在消费者端使用 RepublishMessageRecoverer 时,接收到的消息在 receivedDeliveryMode 消息属性中具有 deliveryMode。在这种情况下,deliveryModenull。这意味着代理上的 NON_PERSISTENT 传输模式。从 2.0 版开始,您可以配置 RepublishMessageRecoverer 以设置要重新发布的消息中的 deliveryMode(如果为 null)。默认情况下,它使用 MessageProperties 默认值 - MessageDeliveryMode.PERSISTENT

以下示例显示了如何将 RepublishMessageRecoverer 设置为恢复程序

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(5)
            .recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
            .build();
}

RepublishMessageRecoverer 使用消息头中的其他信息发布消息,例如异常消息、堆栈跟踪、原始交换机和路由密钥。可以通过创建子类并覆盖 additionalHeaders() 来添加其他标头。还可以像以下示例所示在 additionalHeaders() 中更改 deliveryMode(或任何其他属性)

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {

    protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
        message.getMessageProperties()
            .setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
        return null;
    }

};

从 2.0.5 版开始,如果堆栈跟踪过大,则可能会被截断;这是因为所有标头都必须适合单个帧。默认情况下,如果堆栈跟踪会导致少于 20,000 字节(“剩余空间”)可用于其他标头,则它将被截断。如果您需要更多或更少的空间用于其他标头,可以通过设置恢复程序的 frameMaxHeadroom 属性来调整此设置。从 2.1.13 版、2.2.3 版开始,异常消息包含在此计算中,并且将使用以下算法最大化堆栈跟踪量

  • 如果仅堆栈跟踪超过限制,则异常消息标头将被截断为 97 字节加上 …​,并且堆栈跟踪也被截断。

  • 如果堆栈跟踪很小,则消息将被截断(加上 …​)以适合可用字节(但堆栈跟踪本身内的消息将被截断为 97 字节加上 …​)。

每当发生任何类型的截断时,都会记录原始异常以保留完整信息。在增强标头后执行评估,以便在表达式中使用异常类型等信息。

从 2.4.8 版开始,错误交换机和路由密钥可以作为 SpEL 表达式提供,其中 Message 是评估的根对象。

从 2.3.3 版开始,提供了一个新的子类 RepublishMessageRecovererWithConfirms;它支持两种发布确认样式,并在返回之前等待确认(或者如果未确认或消息被返回则抛出异常)。

如果确认类型为 CORRELATED,则子类还将检测消息是否被返回并抛出 AmqpMessageReturnedException;如果发布被否定确认,它将抛出 AmqpNackReceivedException

如果确认类型为 SIMPLE,则子类将调用通道上的 waitForConfirmsOrDie 方法。

有关确认和返回的更多信息,请参阅 发布确认和返回

从 2.1 版开始,添加了 ImmediateRequeueMessageRecoverer 以抛出 ImmediateRequeueAmqpException,通知侦听器容器重新入队当前失败的消息。

Spring Retry 的异常分类

Spring Retry 在确定哪些异常可以调用重试方面具有很大的灵活性。默认配置会重试所有异常。鉴于用户异常包装在 ListenerExecutionFailedException 中,我们需要确保分类检查异常原因。默认分类器仅查看顶层异常。

从 Spring Retry 1.0.3 开始,BinaryExceptionClassifier 具有一个名为 traverseCauses 的属性(默认值:false)。当为 true 时,它会遍历异常原因,直到找到匹配项或没有原因。

要将此分类器用于重试,您可以使用使用带有最大尝试次数、Exception 实例的 Map 和布尔值(traverseCauses)的构造函数创建的 SimpleRetryPolicy,并将此策略注入 RetryTemplate