启用监听器端点注解
要启用 @RabbitListener 注解支持,您可以将 @EnableRabbit 添加到您的一个 @Configuration 类中。以下示例展示了如何实现:
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
自 2.0 版本起,还提供了 DirectMessageListenerContainerFactory。它创建 DirectMessageListenerContainer 实例。
有关如何选择 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory 的信息,请参阅 选择容器。 |
从 2.2.2 版本开始,您可以提供一个 ContainerCustomizer 实现(如上所示)。这可用于在容器创建和配置后进一步配置容器;例如,您可以使用它来设置容器工厂未公开的属性。
2.4.8 版本提供了 CompositeContainerCustomizer,用于您希望应用多个定制器的情况。
默认情况下,基础设施会查找名为 rabbitListenerContainerFactory 的 bean 作为工厂的来源,该工厂用于创建消息监听器容器。在这种情况下,忽略 RabbitMQ 基础设施设置,可以使用三个核心线程池大小和十个最大线程池大小的线程调用 processOrder 方法。
您可以定制监听器容器工厂,以便将其用于每个注解,或者通过实现 RabbitListenerConfigurer 接口来配置一个显式默认值。仅当至少一个端点未注册特定容器工厂时才需要默认值。有关完整详细信息和示例,请参阅 Javadoc。
容器工厂提供添加 MessagePostProcessor 实例的方法,这些实例在接收消息后(调用监听器之前)和发送回复之前应用。
有关回复的信息,请参阅 回复管理。
从 2.0.6 版本开始,您可以将 RetryTemplate 和 RecoveryCallback 添加到监听器容器工厂。它在发送回复时使用。当重试耗尽时,会调用 RecoveryCallback。
如果您更喜欢 XML 配置,可以使用 <rabbit:annotation-driven> 元素。任何用 @RabbitListener 注解的 bean 都会被检测到。
对于 SimpleRabbitListenerContainer 实例,您可以使用类似于以下内容的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
对于 DirectMessageListenerContainer 实例,您可以使用类似于以下内容的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
从 2.0 版本开始,@RabbitListener 注解具有 concurrency 属性。它支持 SpEL 表达式(#{…})和属性占位符(${…})。其含义和允许的值取决于容器类型,如下所示:
-
对于
DirectMessageListenerContainer,该值必须是单个整数值,它设置容器上的consumersPerQueue属性。 -
对于
SimpleRabbitListenerContainer,该值可以是单个整数值,它设置容器上的concurrentConsumers属性,或者它可以具有m-n的形式,其中m是concurrentConsumers属性,n是maxConcurrentConsumers属性。
在任何一种情况下,此设置都会覆盖工厂上的设置。以前,如果您的监听器需要不同的并发性,则必须定义不同的容器工厂。
该注解还允许通过 autoStartup 和 executor(自 2.2 起)注解属性覆盖工厂的 autoStartup 和 taskExecutor 属性。为每个监听器使用不同的执行器可能有助于在日志和线程转储中识别与每个监听器关联的线程。
2.2 版本还添加了 ackMode 属性,它允许您覆盖容器工厂的 acknowledgeMode 属性。
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}