弹性:从错误和 Broker 故障中恢复
Spring AMQP 提供的一些关键(也是最受欢迎的)高级功能与在协议错误或 broker 故障时进行恢复和自动重新连接有关。我们已经在指南中看到了所有相关组件,但在这里将它们整合起来并单独列出功能和恢复场景会有所帮助。
主要的重连功能由 CachingConnectionFactory 本身启用。使用 RabbitAdmin 自动声明功能也通常很有益。此外,如果你关心保证交付,你可能还需要在 RabbitTemplate 和 SimpleMessageListenerContainer 中使用 channelTransacted 标志,以及在 SimpleMessageListenerContainer 中使用 AcknowledgeMode.AUTO(如果你自己进行确认,则使用手动确认)。
Exchange、队列和绑定的自动声明
RabbitAdmin 组件可以在启动时声明 exchange、队列和绑定。它通过 ConnectionListener 懒惰地完成此操作。因此,如果 broker 在启动时不存在,这并不重要。第一次使用 Connection 时(例如,通过发送消息),监听器会触发,并应用管理功能。在监听器中进行自动声明的另一个好处是,如果连接因任何原因(例如,broker 宕机、网络故障等)断开,它们会在连接重新建立时再次应用。
以这种方式声明的队列必须具有固定名称——要么是显式声明的,要么是框架为 AnonymousQueue 实例生成的。匿名队列是不可持久的、独占的且自动删除的。 |
只有当 CachingConnectionFactory 缓存模式为 CHANNEL(默认)时,才会执行自动声明。存在此限制是因为独占和自动删除队列绑定到连接。 |
从版本 2.2.2 开始,RabbitAdmin 将检测类型为 DeclarableCustomizer 的 bean,并在实际处理声明之前应用该函数。例如,这对于在框架中提供一流支持之前设置新参数(属性)非常有用。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
这在不直接提供对 Declarable bean 定义访问的项目中也很有用。
另请参阅 RabbitMQ 自动连接/拓扑恢复。
同步操作中的故障和重试选项
如果在同步序列中使用 RabbitTemplate 时(例如)失去与 broker 的连接,Spring AMQP 会抛出 AmqpException(通常但不总是 AmqpIOException)。我们不会试图隐藏问题,因此你必须能够捕获并响应异常。如果你怀疑连接已断开(并且不是你的错),最简单的做法是再次尝试操作。你可以手动执行此操作,或者可以考虑使用 Spring Retry 来处理重试(命令式或声明式)。
Spring Retry 提供了一些 AOP 拦截器和极大的灵活性来指定重试的参数(尝试次数、异常类型、退避算法等)。Spring AMQP 还提供了一些便利的工厂 bean,用于以 AMQP 用例的便捷形式创建 Spring Retry 拦截器,并带有强类型回调接口,你可以使用它们来实现自定义恢复逻辑。有关详细信息,请参阅 StatefulRetryOperationsInterceptorFactoryBean 和 StatelessRetryOperationsInterceptorFactoryBean 的 Javadoc 和属性。如果不存在事务或事务在重试回调中启动,则无状态重试是合适的。请注意,无状态重试比有状态重试更容易配置和分析,但如果存在必须回滚或肯定会回滚的正在进行的事务,则通常不适用。事务中间的连接断开应与回滚具有相同的效果。因此,对于事务在堆栈中更高层启动的重新连接,有状态重试通常是最佳选择。有状态重试需要一种机制来唯一标识消息。最简单的方法是让发送方在 MessageId 消息属性中放置一个唯一值。提供的消息转换器提供了一个选项来执行此操作:你可以将 createMessageIds 设置为 true。否则,你可以将 MessageKeyGenerator 实现注入到拦截器中。密钥生成器必须为每条消息返回一个唯一的密钥。在 2.0 版本之前,提供了 MissingMessageIdAdvice。它允许没有 messageId 属性的消息恰好重试一次(忽略重试设置)。此通知不再提供,因为,与 spring-retry 1.2 版本一起,其功能已内置到拦截器和消息监听器容器中。
为了向后兼容,默认情况下(一次重试后),具有空消息 ID 的消息被消费者视为致命(消费者停止)。为了复制 MissingMessageIdAdvice 提供的功能,你可以将监听器容器上的 statefulRetryFatalWithNullMessageId 属性设置为 false。在此设置下,消费者继续运行,并且消息被拒绝(一次重试后)。它被丢弃或路由到死信队列(如果已配置)。 |
从 1.3 版本开始,提供了一个 builder API,以帮助使用 Java(在 @Configuration 类中)组装这些拦截器。以下示例展示了如何操作:
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxRetries(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
只能以这种方式配置重试功能的一个子集。更高级的功能需要配置 RetryPolicy。有关可用策略及其配置的更多信息,请参阅 RetryPolicy Javadoc。
带批处理监听器的重试
不建议为批处理监听器配置重试,除非该批处理是由生产者在单个记录中创建的。有关消费者和生产者创建的批处理的信息,请参阅 批处理消息。对于消费者创建的批处理,框架无法知道批处理中的哪条消息导致了故障,因此在重试耗尽后无法恢复。对于生产者创建的批处理,由于实际上只有一条消息失败,因此可以恢复整个消息。应用程序可能希望通知自定义恢复器批处理中发生故障的位置,或许可以通过设置抛出异常的索引属性。
批处理监听器的重试恢复器必须实现 MessageBatchRecoverer。
消息监听器和异步情况
如果 MessageListener 因业务异常而失败,异常将由消息监听器容器处理,然后容器返回继续监听其他消息。如果故障是由连接断开(而不是业务异常)引起的,则为监听器收集消息的消费者必须被取消并重新启动。SimpleMessageListenerContainer 无缝地处理此问题,并留下日志说明监听器正在重新启动。事实上,它会无限循环,尝试重新启动消费者。只有当消费者行为非常糟糕时,它才会放弃。一个副作用是,如果 broker 在容器启动时宕机,它会一直尝试,直到可以建立连接。
业务异常处理,与协议错误和连接断开不同,可能需要更多的思考和一些自定义配置,特别是当使用事务或容器确认时。在 2.8.x 之前,RabbitMQ 没有死信行为的定义。因此,默认情况下,由于业务异常而被拒绝或回滚的消息可能会无限次地重新交付。为了限制客户端重新交付的次数,一个选择是在监听器的建议链中使用 StatefulRetryOperationsInterceptor。拦截器可以有一个恢复回调,该回调实现自定义死信操作——根据你的特定环境选择适当的操作。
另一种选择是将容器的 defaultRequeueRejected 属性设置为 false。这将导致所有失败的消息被丢弃。当使用 RabbitMQ 2.8.x 或更高版本时,这也有助于将消息传递到死信 exchange。
或者,你可以抛出 AmqpRejectAndDontRequeueException。这样做可以防止消息重新排队,无论 defaultRequeueRejected 属性的设置如何。
从 2.1 版本开始,引入了 ImmediateRequeueAmqpException,以执行完全相反的逻辑:消息将重新排队,无论 defaultRequeueRejected 属性的设置如何。
通常,两种技术结合使用。你可以在建议链中使用 StatefulRetryOperationsInterceptor,并使用一个抛出 AmqpRejectAndDontRequeueException 的 MessageRecoverer。当所有重试都耗尽时,会调用 MessageRecoverer。RejectAndDontRequeueRecoverer 正是这样做的。默认的 MessageRecoverer 消耗错误消息并发出 WARN 消息。
从 1.3 版本开始,提供了一个新的 RepublishMessageRecoverer,允许在重试耗尽后重新发布失败的消息。
当恢复器处理最终异常时,消息被 ack,并且如果配置了 broker,则不会将其发送到死信 exchange。
当在消费者端使用 RepublishMessageRecoverer 时,接收到的消息在 receivedDeliveryMode 消息属性中包含 deliveryMode。在这种情况下,deliveryMode 为 null。这意味着 broker 上的 NON_PERSISTENT 交付模式。从 2.0 版本开始,你可以为 RepublishMessageRecoverer 配置在重新发布时要设置到消息中的 deliveryMode(如果它为 null)。默认情况下,它使用 MessageProperties 默认值 - MessageDeliveryMode.PERSISTENT。 |
以下示例展示了如何将 RepublishMessageRecoverer 设置为恢复器:
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxRetries(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer 将消息与消息头中的附加信息一起发布,例如异常消息、堆栈跟踪、原始 exchange 和路由键。可以通过创建子类并重写 additionalHeaders() 来添加附加头。deliveryMode(或任何其他属性)也可以在 additionalHeaders() 中更改,如下例所示:
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
从版本 2.0.5 开始,如果堆栈跟踪过大,可能会被截断;这是因为所有头都必须适合单个帧。默认情况下,如果堆栈跟踪会导致其他头可用空间不足 20,000 字节(“余量”),它将被截断。如果你需要更多或更少的空间用于其他头,可以通过设置恢复器的 frameMaxHeadroom 属性进行调整。从版本 2.1.13、2.2.3 开始,异常消息被包含在此计算中,并且堆栈跟踪的数量将使用以下算法最大化:
-
如果仅堆栈跟踪就超出限制,则异常消息头将被截断为 97 字节加上
…,并且堆栈跟踪也会被截断。 -
如果堆栈跟踪很小,消息将被截断(加上
…)以适应可用字节(但堆栈跟踪本身中的消息被截断为 97 字节加上…)。
每当发生任何类型的截断时,原始异常将被记录以保留完整信息。评估在头被增强后执行,因此异常类型等信息可以在表达式中使用。
从 2.4.8 版本开始,错误 exchange 和路由键可以作为 SpEL 表达式提供,其中 Message 是评估的根对象。
从 2.3.3 版本开始,提供了一个新的子类 RepublishMessageRecovererWithConfirms;它支持两种发布者确认样式,并将在返回之前等待确认(如果未确认或消息被退回,则抛出异常)。
如果确认类型为 CORRELATED,则子类还会检测消息是否被退回并抛出 AmqpMessageReturnedException;如果发布被否定确认,它将抛出 AmqpNackReceivedException。
如果确认类型为 SIMPLE,则子类将在通道上调用 waitForConfirmsOrDie 方法。
有关确认和退回的更多信息,请参阅 发布者确认和退回。
从 2.1 版本开始,添加了 ImmediateRequeueMessageRecoverer 以抛出 ImmediateRequeueAmqpException,它会通知监听器容器重新排队当前失败的消息。
Spring Retry 的异常分类
Spring Retry 在确定哪些异常可以触发重试方面具有极大的灵活性。默认配置会重试所有异常。鉴于用户异常被封装在 ListenerExecutionFailedException 中,我们需要确保分类检查异常原因。默认分类器只查看顶层异常。
自 Spring Retry 1.0.3 起,BinaryExceptionClassifier 有一个名为 traverseCauses 的属性(默认值:false)。当设置为 true 时,它会遍历异常原因,直到找到匹配项或没有原因。
要将此分类器用于重试,你可以使用通过构造函数创建的 SimpleRetryPolicy,该构造函数接受最大尝试次数、Exception 实例的 Map 和布尔值(traverseCauses),并将此策略注入到 RetryTemplate 中。
通过 Broker 重试
从队列中死信的消息可以在从 DLX 重新路由后重新发布回此队列。这种重试行为在 broker 端通过 x-death 头进行控制。有关此方法的更多信息,请参阅官方 RabbitMQ 文档。
另一种方法是从应用程序手动将失败的消息重新发布回原始 exchange。从版本 4.0 开始,RabbitMQ broker 不考虑客户端发送的 x-death 头。本质上,客户端发送的任何 x-* 头都会被忽略。
为了缓解 RabbitMQ broker 的这种新行为,Spring AMQP 从版本 3.2 开始引入了 retry_count 头。当此头不存在且服务器端 DLX 正在运行时,x-death.count 属性将映射到此头。当失败消息手动重新发布以进行重试时,retry_count 头值必须手动递增。有关更多信息,请参阅 Javadoc。
以下示例总结了通过 broker 进行手动重试的算法:
@RabbitListener(queues = "some_queue")
public void rePublish(Message message) {
try {
// Process message
}
catch (Exception ex) {
Long retryCount = message.getMessageProperties().getRetryCount();
if (retryCount < 3) {
message.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send("", "some_queue", message);
}
else {
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
}
}