入站通道适配器

以下列表显示了 AMQP 入站通道适配器可能的配置选项

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  batch-size=""                   (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)
1 此适配器的唯一 ID。可选。
2 转换后的消息应发送到的消息通道。必需。
3 应从中消费消息的 AMQP 队列名称(逗号分隔列表)。必需。
4 MessageListenerContainer 的确认模式。当设置为 MANUAL 时,交付标签和通道分别在消息头 amqp_deliveryTagamqp_channel 中提供。用户应用程序负责确认。NONE 表示不确认(autoAck)。AUTO 表示适配器的容器在下游流程完成时进行确认。可选(默认为 AUTO)。参见 入站端点确认模式
5 处理与此入站通道适配器相关的横切行为的额外 AOP 通知。可选。
6 标志,指示此组件创建的通道是否是事务性的。如果为 true,它会告诉框架使用事务性通道,并根据结果以提交或回滚结束所有操作(发送或接收),如果出现异常则表示回滚。可选(默认为 false)。
7 指定要创建的并发消费者数量。默认值为 1。我们建议提高并发消费者数量以扩展从队列接收消息的消费能力。但是请注意,一旦注册了多个消费者,任何排序保证都将丢失。通常,对于低流量队列,请使用一个消费者。当设置了 'consumers-per-queue' 时不允许。可选。
8 RabbitMQ ConnectionFactory 的 Bean 引用。可选(默认为 connectionFactory)。
9 错误消息应发送到的消息通道。可选。
10 侦听器通道(com.rabbitmq.client.Channel)是否暴露给已注册的 ChannelAwareMessageListener。可选(默认为 true)。
11 接收 AMQP 消息时使用的 AmqpHeaderMapper 引用。可选。默认情况下,只有标准 AMQP 属性(如 contentType)被复制到 Spring Integration MessageHeaders。默认的 DefaultAmqpHeaderMapper 不会将 AMQP MessageProperties 中任何用户定义的头复制到消息中。如果提供了 request-header-names 则不允许。
12 要从 AMQP 请求映射到 MessageHeaders 的 AMQP 头名称的逗号分隔列表。仅当未提供 'header-mapper' 引用时才能提供。此列表中的值也可以是与头名称匹配的简单模式(例如"*"或"thing1*, thing2"或"*something")。
13 接收 AMQP 消息时使用的 AbstractMessageListenerContainer 引用。如果提供了此属性,则不应提供与侦听器容器配置相关的任何其他属性。换句话说,通过设置此引用,您必须完全负责侦听器容器配置。唯一的例外是 MessageListener 本身。由于这实际上是此通道适配器实现的核心职责,因此引用的侦听器容器不得拥有自己的 MessageListener。可选。
14 接收 AMQP 消息时使用的 MessageConverter。可选。
15 接收 AMQP 消息时使用的 MessagePropertiesConverter。可选。
16 指定底层 AbstractMessageListenerContainer 启动和停止的阶段。启动顺序从最低到最高,关闭顺序与此相反。默认情况下,此值为 Integer.MAX_VALUE,这意味着此容器尽可能晚启动并尽可能早停止。可选。
17 告诉 AMQP 代理一次请求中向每个消费者发送多少条消息。通常,您可以将此值设置得很高以提高吞吐量。它应该大于或等于事务大小(参见此列表后面的 batch-size 属性)。可选(默认为 1)。
18 接收超时,单位为毫秒。可选(默认为 1000)。
19 指定底层 AbstractMessageListenerContainer 恢复尝试之间的间隔(单位为毫秒)。可选(默认为 5000)。
20 如果为 'true' 且代理上没有任何队列可用,则容器在启动期间会抛出致命异常并停止(如果在容器运行时队列被删除,则在三次尝试被动声明队列后)。如果为 false,则容器不会抛出异常并进入恢复模式,根据 recovery-interval 尝试重新启动。可选(默认为 true)。
21 在底层 AbstractMessageListenerContainer 停止后,以及在 AMQP 连接强制关闭之前,等待工作线程的时间(以毫秒为单位)。如果在关闭信号到来时有任何工作线程处于活动状态,只要它们能在此超时内完成处理,就允许它们完成。否则,连接将关闭,消息将保持未确认状态(如果通道是事务性的)。可选(默认为 5000)。
22 默认情况下,底层 AbstractMessageListenerContainer 使用 SimpleAsyncTaskExecutor 实现,它为每个任务启动一个新线程,异步运行。默认情况下,并发线程的数量是无限的。请注意,此实现不重用线程。考虑使用线程池 TaskExecutor 实现作为替代。可选(默认为 SimpleAsyncTaskExecutor)。
23 默认情况下,底层 AbstractMessageListenerContainer 创建 DefaultTransactionAttribute 的新实例(它采用 EJB 方法,对运行时异常回滚但不对已检查异常回滚)。可选(默认为 DefaultTransactionAttribute)。
24 在底层 AbstractMessageListenerContainer 上设置对外部 PlatformTransactionManager 的 bean 引用。事务管理器与 channel-transacted 属性协同工作。如果在框架发送或接收消息时已经存在事务并且 channelTransacted 标志为 true,则消息事务的提交或回滚将推迟到当前事务结束。如果 channelTransacted 标志为 false,则消息操作不适用事务语义(它会自动确认)。有关更多信息,请参阅 使用 Spring AMQP 进行事务。可选。
25 告诉 SimpleMessageListenerContainer 在单个请求中处理多少条消息。为了获得最佳结果,它应该小于或等于 prefetch-count 中设置的值。当设置了 'consumers-per-queue' 时不允许。可选(默认为 1)。
26 指示底层侦听器容器应该是 DirectMessageListenerContainer 而不是默认的 SimpleMessageListenerContainer。有关更多信息,请参阅 Spring AMQP 参考手册
27 当容器的 consumerBatchEnabledtrue 时,确定适配器如何在消息有效负载中呈现消息批次。当设置为 MESSAGES(默认)时,有效负载是一个 List<Message<?>>,其中每条消息都有从传入 AMQP Message 映射的头,并且有效负载是转换后的 body。当设置为 EXTRACT_PAYLOADS 时,有效负载是一个 List<?>,其中元素是从 AMQP Message 正文转换而来的。EXTRACT_PAYLOADS_WITH_HEADERS 类似于 EXTRACT_PAYLOADS,但此外,每条消息的头都从 MessageProperties 映射到相应索引处的 List<Map<String, Object>;头名称是 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS
容器

请注意,当使用 XML 配置外部容器时,您不能使用 Spring AMQP 命名空间来定义容器。这是因为命名空间至少需要一个 <listener/> 元素。在此环境中,侦听器是适配器内部的。因此,您必须使用正常的 Spring <bean/> 定义来定义容器,如下例所示

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
尽管 Spring Integration JMS 和 AMQP 的支持类似,但存在重要差异。JMS 入站通道适配器在底层使用 JmsDestinationPollingSource,并期望配置轮询器。AMQP 入站通道适配器使用 AbstractMessageListenerContainer 并且是消息驱动的。在这方面,它更类似于 JMS 消息驱动通道适配器。

从版本 5.5 开始,AmqpInboundChannelAdapter 可以配置 org.springframework.amqp.rabbit.retry.MessageRecoverer 策略,该策略在内部调用重试操作时用于 RecoveryCallback。有关更多信息,请参阅 setMessageRecoverer() JavaDoc。

@Publisher 注解也可以与 @RabbitListener 结合使用

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

默认情况下,@Publisher AOP 拦截器处理方法调用的返回值。但是,@RabbitListener 方法的返回值被视为 AMQP 回复消息。因此,这种方法不能与 @Publisher 一起使用,所以建议使用 @Payload 注解和针对方法参数的相应 SpEL 表达式。有关 @Publisher 的更多信息,请参见 注解驱动配置 部分。

当在侦听器容器中使用排他或单活动消费者时,建议将容器属性 forceStop 设置为 true。这将防止在停止容器后,另一个消费者在此实例完全停止之前开始消费消息的竞态条件。

批量消息

有关批量消息的更多信息,请参阅 Spring AMQP 文档

要使用 Spring Integration 生成批量消息,只需使用 BatchingRabbitTemplate 配置出站端点。

在接收批量消息时,默认情况下,侦听器容器会提取每个分段消息,适配器会为每个分段生成一个 Message<?>。从版本 5.2 开始,如果容器的 deBatchingEnabled 属性设置为 false,则由适配器而不是容器执行解批处理,并生成一个包含分段有效负载列表(如果适用则进行转换)的单个 Message<List<?>>。该消息的有效负载是分段有效负载的列表(如果适用,则进行转换)。

默认的 BatchingStrategySimpleBatchingStrategy,但这可以在适配器上被覆盖。

当需要重试操作的恢复时,org.springframework.amqp.rabbit.retry.MessageBatchRecoverer 必须与批处理一起使用。
© . This site is unofficial and not affiliated with VMware.