入站通道适配器

下面的清单显示了 AMQP 入站通道适配器的可能配置选项

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
容器

请注意,当使用 XML 配置外部容器时,不能使用 Spring AMQP 命名空间来定义容器。这是因为该命名空间至少需要一个<listener/>元素。在此环境中,侦听器位于适配器内部。因此,必须使用普通的 Spring <bean/> 定义来定义容器,如下例所示。

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
尽管 Spring Integration JMS 和 AMQP 的支持相似,但存在重要的区别。JMS 入站通道适配器在内部使用JmsDestinationPollingSource,并期望配置轮询器。AMQP 入站通道适配器使用AbstractMessageListenerContainer,并且是消息驱动的。在这方面,它更类似于 JMS 消息驱动的通道适配器。

从 5.5 版本开始,AmqpInboundChannelAdapter 可以配置一个org.springframework.amqp.rabbit.retry.MessageRecoverer策略,该策略在内部调用重试操作时的RecoveryCallback中使用。有关更多信息,请参见setMessageRecoverer() 的 JavaDoc。

@Publisher 注解也可以与@RabbitListener结合使用。

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

默认情况下,@Publisher AOP拦截器处理方法调用返回的值。但是,@RabbitListener 方法的返回值被视为 AMQP 回复消息。因此,这种方法不能与@Publisher一起使用,因此建议使用带有相应 SpEL 表达式的@Payload注解来处理方法参数。有关@Publisher的更多信息,请参见基于注解的配置部分。

当在侦听器容器中使用独占或单活动使用者时,建议将容器属性forceStop设置为true。这将防止在停止容器后,另一个使用者可能在此实例完全停止之前开始使用消息的竞争条件。

批量消息

有关批量消息的更多信息,请参见Spring AMQP 文档

要使用 Spring Integration 生成批量消息,只需使用BatchingRabbitTemplate配置出站端点。

在接收批量消息时,默认情况下,侦听器容器会提取每个片段消息,并且适配器将为每个片段生成一个Message<?>。从 5.2 版本开始,如果容器的deBatchingEnabled属性设置为false,则适配器将执行去批量化处理,并生成单个Message<List<?>>,其有效负载是片段有效负载的列表(如果合适则在转换后)。

默认的BatchingStrategySimpleBatchingStrategy,但这可以在适配器上被覆盖。

当需要对重试操作进行恢复时,必须将org.springframework.amqp.rabbit.retry.MessageBatchRecoverer与批处理一起使用。