基于AMQP的消息通道

提供了两种消息通道实现。一种是点对点 (point-to-point),另一种是发布订阅 (publish-subscribe)。这两种通道都为底层的AmqpTemplateSimpleMessageListenerContainer提供了广泛的配置属性(如本章前面关于通道适配器和网关部分所示)。但是,我们在此展示的示例配置最小化。请浏览XML模式以查看可用属性。

点对点通道可能如下所示:

<int-amqp:channel id="p2pChannel"/>

在底层,上面的示例会导致声明一个名为si.p2pChannel的队列,并且此通道向该队列发送消息(从技术上讲,它是通过向无名直接交换机发送消息,并且路由密钥与该队列的名称匹配来实现的)。此通道还在该队列上注册了一个消费者。如果您希望通道是“轮询”(可轮询)的而不是消息驱动的,请将message-driven标志的值提供为false,如下例所示:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布订阅通道可能如下所示:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在底层,上面的示例会导致声明一个名为si.fanout.pubSubChannel的扇出交换机,并且此通道向该扇出交换机发送消息。此通道还会声明一个服务器命名的独占的、自动删除的、非持久的队列,并将该队列绑定到扇出交换机,同时在这个队列上注册一个消费者来接收消息。发布订阅通道没有“轮询”选项。它必须是消息驱动的。

从4.1版本开始,支持使用template-channel-transacted(与channel-transacted结合使用)的基于AMQP的消息通道,以便为AbstractMessageListenerContainerRabbitTemplate分别进行transactional配置。请注意,以前,channel-transacted默认情况下为true。现在,对于AbstractMessageListenerContainer,它默认情况下为false

在4.3版本之前,基于AMQP的通道仅支持具有Serializable有效负载和标头的消息。整个消息会被转换(序列化)并发送到RabbitMQ。现在,您可以将extract-payload属性(或使用Java配置时使用setExtractPayload())设置为true。当此标志为true时,消息有效负载将被转换,并且标头将被映射,方式类似于使用通道适配器时。此安排允许基于AMQP的通道与非可序列化有效负载一起使用(可能使用另一个消息转换器,例如Jackson2JsonMessageConverter)。有关默认映射标头的更多信息,请参见AMQP消息标头。您可以通过提供使用outbound-header-mapperinbound-header-mapper属性的自定义映射器来修改映射。您现在还可以指定一个default-delivery-mode,它用于在没有amqp_deliveryMode标头时设置传递模式。默认情况下,Spring AMQP MessageProperties使用PERSISTENT传递模式。

与其他持久性支持的通道一样,基于AMQP的通道旨在提供消息持久性以避免消息丢失。它们并非旨在将工作分发到其他对等应用程序。为此,请改用通道适配器。
从5.0版本开始,可轮询通道现在会阻塞轮询线程,持续时间为指定的receiveTimeout(默认为1秒)。以前,与其他PollableChannel实现不同,如果不可用消息,则线程会立即返回到调度程序,而不管接收超时时间如何。阻塞比使用basicGet()检索消息(无超时)开销略大一些,因为必须创建一个消费者来接收每条消息。要恢复以前的行为,请将轮询程序的receiveTimeout设置为0。

使用Java配置进行配置

以下示例演示如何使用Java配置配置通道:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用Java DSL进行配置

以下示例演示如何使用Java DSL配置通道:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}