连接和资源管理

虽然我们在上一节中描述的 AMQP 模型是通用的,适用于所有实现,但当涉及到资源管理时,细节则特定于代理实现。因此,在本节中,我们将重点关注仅存在于我们的“spring-rabbit”模块中的代码,因为目前 RabbitMQ 是唯一支持的实现。

管理与 RabbitMQ 代理连接的中心组件是 ConnectionFactory 接口。ConnectionFactory 实现的职责是提供 org.springframework.amqp.rabbit.connection.Connection 的实例,它是 com.rabbitmq.client.Connection 的包装器。

选择连接工厂

有三种连接工厂可供选择

  • PooledChannelConnectionFactory

  • ThreadChannelConnectionFactory

  • CachingConnectionFactory

前两个是在 2.3 版本中添加的。

对于大多数用例,应该使用 CachingConnectionFactory。如果您希望确保严格的消息排序,而无需使用 范围操作,则可以使用 ThreadChannelConnectionFactoryPooledChannelConnectionFactoryCachingConnectionFactory 类似,它使用单个连接和一个通道池。它的实现更简单,但它不支持关联的发布者确认。

所有三种工厂都支持简单的发布者确认。

当配置 RabbitTemplate 以使用 单独的连接 时,您现在可以从 2.3.2 版本开始,将发布连接工厂配置为不同的类型。默认情况下,发布工厂与主工厂类型相同,并且在主工厂上设置的任何属性也会传播到发布工厂。

从 3.1 版本开始,AbstractConnectionFactory 包含 connectionCreatingBackOff 属性,它支持连接模块中的回退策略。目前,在 createChannel() 的行为中,支持处理在达到 channelMax 限制时发生的异常,实现基于尝试和间隔的回退策略。

PooledChannelConnectionFactory

此工厂管理单个连接和两个通道池,基于 Apache Pool2。一个池用于事务性通道,另一个用于非事务性通道。池是 GenericObjectPool,具有默认配置;提供回调来配置池;有关更多信息,请参阅 Apache 文档。

要使用此工厂,必须将 Apache commons-pool2 jar 包放在类路径中。

@Bean
PooledChannelConnectionFactory pcf() throws Exception {
    ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
    rabbitConnectionFactory.setHost("localhost");
    PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
    pcf.setPoolConfigurer((pool, tx) -> {
        if (tx) {
            // configure the transactional pool
        }
        else {
            // configure the non-transactional pool
        }
    });
    return pcf;
}

ThreadChannelConnectionFactory

此工厂管理一个连接和两个 ThreadLocal,一个用于事务性通道,另一个用于非事务性通道。此工厂确保同一线程上的所有操作都使用相同的通道(只要它保持打开状态)。这有助于严格的消息排序,而无需使用 范围操作。为了避免内存泄漏,如果您的应用程序使用许多短暂的线程,您必须调用工厂的 closeThreadChannel() 来释放通道资源。从 2.3.7 版本开始,线程可以将其通道转移到另一个线程。有关更多信息,请参阅 多线程环境中的严格消息排序

CachingConnectionFactory

提供的第三个实现是 CachingConnectionFactory,它默认情况下建立一个可以由应用程序共享的单个连接代理。连接的共享是可能的,因为使用 AMQP 进行消息传递的“工作单元”实际上是“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系)。连接实例提供了一个 createChannel 方法。CachingConnectionFactory 实现支持对这些通道进行缓存,并且它根据通道是否为事务性维护单独的缓存。在创建 CachingConnectionFactory 的实例时,您可以通过构造函数提供“主机名”。您还应该提供“用户名”和“密码”属性。要配置通道缓存的大小(默认值为 25),您可以调用 setChannelCacheSize() 方法。

从 1.3 版本开始,您可以配置 CachingConnectionFactory 来缓存连接以及仅缓存通道。在这种情况下,每次调用 createConnection() 都会创建一个新连接(或从缓存中检索一个空闲连接)。关闭连接会将其返回到缓存(如果缓存大小尚未达到)。在这些连接上创建的通道也会被缓存。在某些环境中使用单独的连接可能很有用,例如从 HA 集群中消费,结合负载均衡器,连接到不同的集群成员,等等。要缓存连接,请将 cacheMode 设置为 CacheMode.CONNECTION

这不会限制连接数量。相反,它指定允许多少个空闲打开的连接。

从 1.5.5 版本开始,提供了一个名为 connectionLimit 的新属性。设置此属性时,它会限制允许的连接总数。如果达到限制,则使用 channelCheckoutTimeLimit 等待连接变为空闲。如果时间超过,则会抛出 AmqpTimeoutException

当缓存模式为CONNECTION时,不支持自动声明队列和其他内容(参见自动声明交换机、队列和绑定)。

此外,在撰写本文时,amqp-client 库默认情况下为每个连接创建一个固定线程池(默认大小:Runtime.getRuntime().availableProcessors() * 2 个线程)。当使用大量连接时,您应该考虑在CachingConnectionFactory上设置自定义executor。然后,所有连接都可以使用同一个执行器,并且它的线程可以共享。执行器的线程池应该是不受限制的,或者根据预期使用情况进行适当设置(通常,每个连接至少一个线程)。如果在每个连接上创建多个通道,池大小会影响并发性,因此可变(或简单缓存)线程池执行器最适合。

重要的是要理解,缓存大小(默认情况下)不是限制,而仅仅是可缓存的通道数量。例如,如果缓存大小为 10,实际上可以使用任意数量的通道。如果使用超过 10 个通道,并且所有通道都返回到缓存,则 10 个通道进入缓存。其余的通道将被物理关闭。

从 1.6 版本开始,默认通道缓存大小已从 1 增加到 25。在高流量、多线程环境中,较小的缓存意味着通道以高频率创建和关闭。增加默认缓存大小可以避免这种开销。您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,如果发现创建和关闭了许多通道,请考虑进一步增加缓存大小。缓存仅按需增长(以适应应用程序的并发需求),因此此更改不会影响现有的低流量应用程序。

从 1.4.2 版本开始,CachingConnectionFactory 具有一个名为channelCheckoutTimeout的属性。当此属性大于零时,channelCacheSize 成为可以在连接上创建的通道数量的限制。如果达到限制,调用线程将阻塞,直到有通道可用或达到此超时,在这种情况下将抛出AmqpTimeoutException

框架内使用的通道(例如,RabbitTemplate)会可靠地返回到缓存中。如果您在框架之外创建通道(例如,通过直接访问连接并调用createChannel()),则必须可靠地返回它们(通过关闭),可能在finally块中,以避免耗尽通道。

以下示例展示了如何创建一个新的connection

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

使用 XML 时,配置可能如下所示

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
</bean>
还有一个SingleConnectionFactory实现,它只在框架的单元测试代码中可用。它比CachingConnectionFactory更简单,因为它不缓存通道,但由于缺乏性能和弹性,它不适合在简单的测试之外的实际使用。如果您需要出于某种原因实现自己的ConnectionFactoryAbstractConnectionFactory基类可以提供一个不错的起点。

可以使用 rabbit 命名空间快速便捷地创建ConnectionFactory,如下所示

<rabbit:connection-factory id="connectionFactory"/>

在大多数情况下,这种方法更可取,因为框架可以为您选择最佳的默认值。创建的实例是CachingConnectionFactory。请记住,通道的默认缓存大小为 25。如果您希望缓存更多通道,请通过设置 'channelCacheSize' 属性来设置更大的值。在 XML 中,它将如下所示

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg value="somehost"/>
    <property name="username" value="guest"/>
    <property name="password" value="guest"/>
    <property name="channelCacheSize" value="50"/>
</bean>

此外,使用命名空间,您可以添加 'channel-cache-size' 属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

默认缓存模式为CHANNEL,但您可以将其配置为改为缓存连接。在以下示例中,我们使用connection-cache-size

<rabbit:connection-factory
    id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

您可以使用命名空间提供主机和端口属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" host="somehost" port="5672"/>

或者,如果在集群环境中运行,您可以使用 addresses 属性,如下所示

<rabbit:connection-factory
    id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有关address-shuffle-mode的信息,请参见连接到集群

以下示例使用自定义线程工厂,该工厂将线程名称前缀为rabbitmq-

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
    thread-factory="tf"
    channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
    <constructor-arg value="rabbitmq-" />
</bean>

地址解析器

从 2.1.15 版本开始,您现在可以使用AddressResolver来解析连接地址。这将覆盖addresseshost/port属性的任何设置。

命名连接

从 1.7 版本开始,提供了一个ConnectionNameStrategy,用于注入AbstractionConnectionFactory。生成的名称用于目标 RabbitMQ 连接的应用程序特定标识。如果 RabbitMQ 服务器支持,连接名称将显示在管理 UI 中。此值不必唯一,也不能用作连接标识符,例如,在 HTTP API 请求中。此值应该是人类可读的,并且是ClientPropertiesconnection_name键的一部分。您可以使用简单的 Lambda,如下所示

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

ConnectionFactory 参数可用于通过某些逻辑区分目标连接名称。默认情况下,AbstractConnectionFactorybeanName、表示对象的十六进制字符串和内部计数器用于生成 connection_name<rabbit:connection-factory> 命名空间组件还提供 connection-name-strategy 属性。

SimplePropertyValueConnectionNameStrategy 的实现将连接名称设置为应用程序属性。您可以将其声明为 @Bean 并将其注入到连接工厂中,如下例所示

@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
    return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}

@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    ...
    connectionFactory.setConnectionNameStrategy(cns);
    return connectionFactory;
}

该属性必须存在于应用程序上下文的 Environment 中。

当使用 Spring Boot 及其自动配置的连接工厂时,您只需声明 ConnectionNameStrategy @Bean。Boot 会自动检测该 bean 并将其连接到工厂。

阻塞连接和资源限制

连接可能会因与 内存警报 相对应的代理的交互而被阻塞。从 2.0 版本开始,org.springframework.amqp.rabbit.connection.Connection 可以提供 com.rabbitmq.client.BlockedListener 实例,以便在连接阻塞和解除阻塞事件时收到通知。此外,AbstractConnectionFactory 分别通过其内部 BlockedListener 实现发出 ConnectionBlockedEventConnectionUnblockedEvent。这些允许您提供应用程序逻辑以对代理上的问题做出适当的反应,并(例如)采取一些纠正措施。

当应用程序配置为使用单个 CachingConnectionFactory 时(默认情况下,Spring Boot 自动配置会使用单个 CachingConnectionFactory),当连接被代理阻塞时,应用程序将停止工作。当它被代理阻塞时,它的任何客户端都会停止工作。如果我们在同一个应用程序中拥有生产者和消费者,当生产者阻塞连接(因为代理上没有更多资源)而消费者无法释放它们(因为连接被阻塞)时,我们可能会遇到死锁。为了缓解这个问题,我们建议再创建一个单独的 CachingConnectionFactory 实例,并使用相同的选项——一个用于生产者,一个用于消费者。对于在消费者线程上执行的事务性生产者,单独的 CachingConnectionFactory 是不可能的,因为它们应该重用与消费者事务关联的 Channel

从 2.0.2 版本开始,RabbitTemplate 具有一个配置选项,可以自动使用第二个连接工厂,除非正在使用事务。有关更多信息,请参阅 使用单独连接。发布者连接的 ConnectionNameStrategy 与主策略相同,只是在调用该方法的结果后面附加了 .publisher

从 1.7.7 版本开始,提供了一个 AmqpResourceNotAvailableException 异常,当 SimpleConnection.createChannel() 无法创建 Channel 时(例如,由于 channelMax 限制已达到,并且缓存中没有可用的通道),就会抛出此异常。您可以在 RetryPolicy 中使用此异常,以便在经过一段时间的回退后恢复操作。

配置底层客户端连接工厂

CachingConnectionFactory 使用 Rabbit 客户端 ConnectionFactory 的实例。在设置 CachingConnectionFactory 上的等效属性时,会传递许多配置属性(例如,hostportuserNamepasswordrequestedHeartBeatconnectionTimeout)。要设置其他属性(例如,clientProperties),您可以定义 Rabbit 工厂的实例,并使用 CachingConnectionFactory 的适当构造函数提供对它的引用。当使用命名空间(如前所述)时,您需要在 connection-factory 属性中提供对已配置工厂的引用。为了方便起见,提供了一个工厂 Bean 来帮助在 Spring 应用程序上下文中配置连接工厂,如 下一节 中所述。

<rabbit:connection-factory
      id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认情况下启用自动恢复。虽然与此功能兼容,但 Spring AMQP 具有自己的恢复机制,并且通常不需要客户端恢复功能。我们建议禁用 amqp-client 自动恢复,以避免在代理可用但连接尚未恢复时出现 AutoRecoverConnectionNotCurrentlyOpenException 实例。例如,当在 RabbitTemplate 中配置 RetryTemplate 时,即使故障转移到集群中的另一个代理,您也可能会注意到此异常。由于自动恢复连接在计时器上恢复,因此使用 Spring AMQP 的恢复机制可以更快地恢复连接。从 1.7.1 版本开始,Spring AMQP 禁用 amqp-client 自动恢复,除非您显式创建自己的 RabbitMQ 连接工厂并将其提供给 CachingConnectionFactory。由 RabbitConnectionFactoryBean 创建的 RabbitMQ ConnectionFactory 实例默认情况下也禁用了此选项。

RabbitConnectionFactoryBean 和配置 SSL

从 1.4 版本开始,提供了一个方便的 RabbitConnectionFactoryBean,可以使用依赖注入在底层客户端连接工厂上方便地配置 SSL 属性。其他 setter 会委托给底层工厂。以前,您必须以编程方式配置 SSL 选项。以下示例展示了如何配置 RabbitConnectionFactoryBean

Java
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
    RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
    factoryBean.setUseSSL(true);
    factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
    return factoryBean;
}

@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
    CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
    ccf.setHost("...");
    // ...
    return ccf;
}
启动 application.properties
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
XML
<rabbit:connection-factory id="rabbitConnectionFactory"
    connection-factory="clientConnectionFactory"
    host="${host}"
    port="${port}"
    virtual-host="${vhost}"
    username="${username}" password="${password}" />

<bean id="clientConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
    <property name="useSSL" value="true" />
    <property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>

有关配置 SSL 的信息,请参阅 RabbitMQ 文档。省略 keyStoretrustStore 配置以通过 SSL 连接,而无需证书验证。以下示例显示了如何提供密钥和信任存储配置。

sslPropertiesLocation 属性是一个 Spring Resource,指向包含以下键的属性文件

keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret

keyStoretruststore 是指向存储的 Spring Resources。通常,此属性文件由操作系统保护,应用程序具有读取权限。

从 Spring AMQP 版本 1.5 开始,您可以在工厂 Bean 上直接设置这些属性。如果同时提供了离散属性和 sslPropertiesLocation,则后者中的属性会覆盖离散值。

从版本 2.0 开始,服务器证书默认情况下会被验证,因为这样做更安全。如果您出于某种原因希望跳过此验证,请将工厂 Bean 的 skipServerCertificateValidation 属性设置为 true。从版本 2.1 开始,RabbitConnectionFactoryBean 现在默认情况下会调用 enableHostnameVerification()。要恢复到之前的行为,请将 enableHostnameVerification 属性设置为 false
从版本 2.2.5 开始,工厂 Bean 默认情况下将始终使用 TLS v1.2;以前,它在某些情况下使用 v1.1,而在其他情况下使用 v1.2(取决于其他属性)。如果您出于某种原因需要使用 v1.1,请设置 sslAlgorithm 属性:setSslAlgorithm("TLSv1.1")

连接到集群

要连接到集群,请在 CachingConnectionFactory 上配置 addresses 属性

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    return ccf;
}

从版本 3.0 开始,底层连接工厂将在建立新连接时尝试连接到主机,方法是选择一个随机地址。要恢复到从第一个到最后一个尝试连接的先前行为,请将 addressShuffleMode 属性设置为 AddressShuffleMode.NONE

从版本 2.3 开始,添加了 INORDER 随机模式,这意味着第一个地址在创建连接后将移至末尾。如果您希望使用 RabbitMQ 分片插件 以及 CacheMode.CONNECTION 和适当的并发性,则可能希望使用此模式从所有节点上的所有分片进行消费。

@Bean
public CachingConnectionFactory ccf() {
    CachingConnectionFactory ccf = new CachingConnectionFactory();
    ccf.setAddresses("host1:5672,host2:5672,host3:5672");
    ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
    return ccf;
}

路由连接工厂

从 1.3 版本开始,引入了 AbstractRoutingConnectionFactory。此工厂提供了一种机制,可以为多个 ConnectionFactories 配置映射,并在运行时通过某个 lookupKey 确定目标 ConnectionFactory。通常,实现会检查线程绑定的上下文。为了方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory,它从 SimpleResourceHolder 获取当前线程绑定的 lookupKey。以下示例展示了如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory

<bean id="connectionFactory"
      class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
    <property name="targetConnectionFactories">
        <map>
            <entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
            <entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
        </map>
    </property>
</bean>

<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void service(String vHost, String payload) {
        SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
        rabbitTemplate.convertAndSend(payload);
        SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
    }

}

使用完资源后,务必取消绑定。有关更多信息,请参阅 AbstractRoutingConnectionFactoryJavaDoc

从 1.4 版本开始,RabbitTemplate 支持 SpEL sendConnectionFactorySelectorExpressionreceiveConnectionFactorySelectorExpression 属性,这些属性在每次 AMQP 协议交互操作(sendsendAndReceivereceivereceiveAndReply)时进行评估,解析为提供的 AbstractRoutingConnectionFactorylookupKey 值。您可以在表达式中使用 Bean 引用,例如 @vHostResolver.getVHost(#root)。对于 send 操作,要发送的消息是根评估对象。对于 receive 操作,queueName 是根评估对象。

路由算法如下:如果选择器表达式为 null 或评估结果为 null,或者提供的 ConnectionFactory 不是 AbstractRoutingConnectionFactory 的实例,则一切按原样工作,依赖于提供的 ConnectionFactory 实现。如果评估结果不为 null,但该 lookupKey 没有目标 ConnectionFactory,并且 AbstractRoutingConnectionFactory 配置为 lenientFallback = true,则也会发生这种情况。在 AbstractRoutingConnectionFactory 的情况下,它会根据 determineCurrentLookupKey() 回退到其 routing 实现。但是,如果 lenientFallback = false,则会抛出 IllegalStateException

命名空间支持还为<rabbit:template>组件提供了send-connection-factory-selector-expressionreceive-connection-factory-selector-expression属性。

此外,从 1.4 版本开始,您可以在监听器容器中配置路由连接工厂。在这种情况下,队列名称列表用作查找键。例如,如果您使用setQueueNames("thing1", "thing2")配置容器,则查找键为[thing1,thing]"(注意键中没有空格)。

从 1.6.9 版本开始,您可以使用监听器容器上的setLookupKeyQualifier向查找键添加限定符。这样做可以实现例如监听具有相同名称但在不同虚拟主机中的队列(您需要为每个虚拟主机设置一个连接工厂)。

例如,使用查找键限定符thing1和监听队列thing2的容器,您可以使用thing1[thing2]注册目标连接工厂。

目标(以及如果提供则为默认)连接工厂必须具有相同的发布确认和返回设置。请参阅发布确认和返回

从 2.4.4 版本开始,可以禁用此验证。如果您遇到确认和返回之间需要不相等的情况,可以使用AbstractRoutingConnectionFactory#setConsistentConfirmsReturns关闭验证。请注意,添加到AbstractRoutingConnectionFactory的第一个连接工厂将确定confirmsreturns的通用值。

如果您遇到某些消息需要检查确认/返回而其他消息不需要的情况,这可能很有用。例如

@Bean
public RabbitTemplate rabbitTemplate() {
    final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
    cf.setHost("localhost");
    cf.setPort(5672);

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

    PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);

    final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
    connectionFactoryMap.put("true", cachingConnectionFactory);
    connectionFactoryMap.put("false", pooledChannelConnectionFactory);

    final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
    routingConnectionFactory.setConsistentConfirmsReturns(false);
    routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
    routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);

    final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);

    final Expression sendExpression = new SpelExpressionParser().parseExpression(
            "messageProperties.headers['x-use-publisher-confirms'] ?: false");
    rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}

这样,带有x-use-publisher-confirms: true头的消息将通过缓存连接发送,您可以确保消息传递。有关确保消息传递的更多信息,请参阅发布确认和返回

队列亲和力和LocalizedQueueConnectionFactory

在集群中使用 HA 队列时,为了获得最佳性能,您可能希望连接到主队列所在的物理代理。CachingConnectionFactory 可以配置多个代理地址。这是为了进行故障转移,客户端尝试根据配置的 AddressShuffleMode 顺序进行连接。LocalizedQueueConnectionFactory 使用管理插件提供的 REST API 来确定哪个节点是队列的主节点。然后,它创建一个连接到该节点的 CachingConnectionFactory(或从缓存中检索)。如果连接失败,则确定新的主节点,并且消费者连接到该节点。LocalizedQueueConnectionFactory 配置了默认连接工厂,以防无法确定队列的物理位置,在这种情况下,它将像往常一样连接到集群。

LocalizedQueueConnectionFactory 是一个 RoutingConnectionFactorySimpleMessageListenerContainer 使用队列名称作为查找键,如上文 路由连接工厂 中所述。

因此(使用队列名称进行查找),LocalizedQueueConnectionFactory 只能在容器配置为监听单个队列时使用。
每个节点必须启用 RabbitMQ 管理插件。
此连接工厂适用于长时间运行的连接,例如 SimpleMessageListenerContainer 使用的连接。它不适用于短连接使用,例如使用 RabbitTemplate,因为在建立连接之前调用 REST API 会产生开销。此外,对于发布操作,队列是未知的,并且消息会发布到所有集群成员,因此查找节点的逻辑价值不大。

以下示例配置显示了如何配置工厂

@Autowired
private ConfigurationProperties props;

@Bean
public CachingConnectionFactory defaultConnectionFactory() {
    CachingConnectionFactory cf = new CachingConnectionFactory();
    cf.setAddresses(this.props.getAddresses());
    cf.setUsername(this.props.getUsername());
    cf.setPassword(this.props.getPassword());
    cf.setVirtualHost(this.props.getVirtualHost());
    return cf;
}

@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
        @Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
    return new LocalizedQueueConnectionFactory(defaultCF,
            StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
            StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
            StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
            this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
            false, null);
}

请注意,前三个参数是 addressesadminUrisnodes 的数组。这些参数的位置是,当容器尝试连接到队列时,它使用管理 API 来确定哪个节点是队列的主节点,并连接到与该节点在同一数组位置的地址。

从 3.0 版本开始,RabbitMQ http-client 不再用于访问 Rest API。相反,默认情况下,如果类路径上有 spring-webflux,则使用来自 Spring Webflux 的 WebClient;否则使用 RestTemplate

WebFlux 添加到类路径

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit</artifactId>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit'

您也可以通过实现LocalizedQueueConnectionFactory.NodeLocator 并覆盖其createClientrestCall 和可选的close 方法来使用其他 REST 技术。

lqcf.setNodeLocator(new NodeLocator<MyClient>() {

    @Override
    public MyClient createClient(String userName, String password) {
        ...
    }

    @Override
    public HashMap<String, Object> restCall(MyClient client, URI uri) {
        ...
    });

});

框架提供了WebFluxNodeLocatorRestTemplateNodeLocator,默认情况下如上所述。

发布者确认和返回

通过将CachingConnectionFactory 属性publisherConfirmType 设置为ConfirmType.CORRELATED 以及将publisherReturns 属性设置为“true”,支持确认(带关联)和返回消息。

当设置这些选项时,工厂创建的Channel 实例将被包装在PublisherCallbackChannel 中,用于促进回调。当获得此类通道时,客户端可以在Channel 上注册PublisherCallbackChannel.ListenerPublisherCallbackChannel 实现包含将确认或返回路由到相应侦听器的逻辑。这些功能将在以下部分中进一步解释。

另请参见 关联发布者确认和返回 以及 作用域操作 中的simplePublisherConfirms

有关更多背景信息,请参阅 RabbitMQ 团队发布的博客文章,标题为 Introducing Publisher Confirms

连接和通道侦听器

连接工厂支持注册ConnectionListenerChannelListener 实现。这使您可以接收与连接和通道相关的事件的通知。(ConnectionListenerRabbitAdmin 用于在建立连接时执行声明 - 有关更多信息,请参见 交换、队列和绑定的自动声明)。以下清单显示了ConnectionListener 接口定义

@FunctionalInterface
public interface ConnectionListener {

    void onCreate(Connection connection);

    default void onClose(Connection connection) {
    }

    default void onShutDown(ShutdownSignalException signal) {
    }

}

从版本 2.0 开始,可以为org.springframework.amqp.rabbit.connection.Connection 对象提供com.rabbitmq.client.BlockedListener 实例,以便在连接阻塞和解除阻塞事件时收到通知。以下示例显示了 ChannelListener 接口定义

@FunctionalInterface
public interface ChannelListener {

    void onCreate(Channel channel, boolean transactional);

    default void onShutDown(ShutdownSignalException signal) {
    }

}

有关可能需要注册 ChannelListener 的一种情况,请参见 发布是异步的 - 如何检测成功和失败

记录通道关闭事件

版本 1.5 引入了一种机制,允许用户控制日志级别。

AbstractConnectionFactory 使用默认策略记录通道关闭,如下所示

  • 正常通道关闭 (200 OK) 不会记录。

  • 如果通道由于被动队列声明失败而关闭,则在 DEBUG 级别记录。

  • 如果通道关闭是因为 basic.consume 由于独占消费者条件而被拒绝,则在 DEBUG 级别记录(从 3.1 开始,以前是 INFO)。

  • 所有其他情况都在 ERROR 级别记录。

要修改此行为,您可以将自定义 ConditionalExceptionLogger 注入 CachingConnectionFactorycloseExceptionLogger 属性中。

此外,AbstractConnectionFactory.DefaultChannelCloseLogger 现在是公开的,允许对其进行子类化。

另请参见 消费者事件

运行时缓存属性

从版本 1.6 开始,CachingConnectionFactory 现在通过 getCacheProperties() 方法提供缓存统计信息。这些统计信息可用于调整缓存以在生产环境中对其进行优化。例如,高水位标记可用于确定是否应增加缓存大小。如果它等于缓存大小,您可能需要考虑进一步增加。下表描述了 CacheMode.CHANNEL 属性

表 1. CacheMode.CHANNEL 的缓存属性
属性 含义
connectionName

ConnectionNameStrategy 生成的连接的名称。

channelCacheSize

当前配置的允许空闲的最大通道数。

localPort

连接的本地端口(如果可用)。这可用于与 RabbitMQ 管理 UI 上的连接和通道相关联。

idleChannelsTx

当前空闲(缓存)的事务通道数。

idleChannelsNotTx

当前空闲(缓存)的非事务通道数。

idleChannelsTxHighWater

并发空闲(缓存)的事务通道的最大数量。

idleChannelsNotTxHighWater

并发空闲(缓存)的非事务通道的最大数量。

下表描述了 CacheMode.CONNECTION 属性

表 2. CacheMode.CONNECTION 的缓存属性
属性 含义
connectionName:<localPort>

ConnectionNameStrategy 生成的连接的名称。

openConnections

表示与代理的连接的连接对象的数量。

channelCacheSize

当前配置的允许空闲的最大通道数。

connectionCacheSize

当前配置的允许空闲的最大连接数。

idleConnections

当前空闲的连接数。

idleConnectionsHighWater

并发空闲的连接的最大数量。

idleChannelsTx:<localPort>

当前空闲(缓存)的此连接的事务通道数量。可以使用属性名称的localPort部分与 RabbitMQ 管理 UI 上的连接和通道进行关联。

idleChannelsNotTx:<localPort>

当前空闲(缓存)的此连接的非事务通道数量。可以使用属性名称的localPort部分与 RabbitMQ 管理 UI 上的连接和通道进行关联。

idleChannelsTxHighWater:<localPort>

同时空闲(缓存)的事务通道的最大数量。可以使用属性名称的 localPort 部分与 RabbitMQ 管理 UI 上的连接和通道进行关联。

idleChannelsNotTxHighWater:<localPort>

同时空闲(缓存)的非事务通道的最大数量。可以使用属性名称的localPort部分与 RabbitMQ 管理 UI 上的连接和通道进行关联。

还包括cacheMode属性(CHANNELCONNECTION)。

cacheStats
图 1. JVisualVM 示例

RabbitMQ 自动连接/拓扑恢复

从 Spring AMQP 的第一个版本开始,该框架就在代理失败的情况下提供了自己的连接和通道恢复。此外,如配置代理中所述,RabbitAdmin在连接重新建立时重新声明任何基础设施 bean(队列等)。因此,它不依赖于amqp-client库现在提供的自动恢复amqp-client默认情况下启用了自动恢复。两种恢复机制之间存在一些不兼容性,因此,默认情况下,Spring 将基础RabbitMQ connectionFactory上的automaticRecoveryEnabled属性设置为false。即使该属性为true,Spring 也会通过立即关闭任何恢复的连接来有效地禁用它。

默认情况下,只有定义为 bean 的元素(队列、交换机、绑定)将在连接失败后重新声明。有关如何更改此行为,请参阅恢复自动删除声明