异步消费者

Spring AMQP 还通过使用 @RabbitListener 注解支持注释监听器端点,并提供一个开放的基础设施以编程方式注册端点。这是迄今为止设置异步消费者的最便捷方式。有关更多详细信息,请参阅 基于注解的监听器端点

预取默认值以前是 1,这会导致有效消费者的利用率不足。从 2.0 版本开始,默认预取值现在是 250,这应该能够在大多数常见情况下让消费者保持忙碌,从而提高吞吐量。

然而,在某些情况下,预取值应该很低

  • 对于大型消息,尤其是当处理速度很慢时(消息可能会在客户端进程中累积到大量的内存)

  • 当需要严格的消息排序时(在这种情况下,预取值应该设置为 1)

  • 其他特殊情况

此外,对于低容量消息和多个消费者(包括单个监听器容器实例内的并发),您可能希望降低预取值,以便在消费者之间更均匀地分配消息。

有关预取的更多背景信息,请参阅这篇关于 RabbitMQ 中消费者利用率 的文章,以及这篇关于 排队理论 的文章。

消息监听器

对于异步 Message 接收,会涉及一个专用组件(不是 AmqpTemplate)。该组件是 Message 消费回调的容器。我们将在本节后面讨论容器及其属性。不过,首先我们应该看一下回调,因为这是您的应用程序代码与消息系统集成的地方。回调有几种选择,首先是 MessageListener 接口的实现,如下面的清单所示

public interface MessageListener {
    void onMessage(Message message);
}

如果您的回调逻辑出于任何原因依赖于 AMQP Channel 实例,您可以使用 ChannelAwareMessageListener。它看起来很相似,但有一个额外的参数。下面的清单显示了 ChannelAwareMessageListener 接口定义

public interface ChannelAwareMessageListener {
    void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版本中,此接口从包 o.s.amqp.rabbit.core 移动到 o.s.amqp.rabbit.listener.api

MessageListenerAdapter

如果您希望在应用程序逻辑和消息 API 之间保持更严格的分隔,您可以依赖框架提供的适配器实现。这通常被称为“消息驱动 POJO”支持。

1.5 版本引入了更灵活的 POJO 消息机制,即 @RabbitListener 注解。有关更多信息,请参见 注解驱动的监听器端点

使用适配器时,您只需要提供对适配器本身应该调用的实例的引用。以下示例演示了如何执行此操作

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");

您可以对适配器进行子类化,并提供 getListenerMethodName() 的实现,以根据消息动态选择不同的方法。此方法有两个参数,originalMessageextractedMessage,后者是任何转换的结果。默认情况下,会配置 SimpleMessageConverter。有关更多信息以及其他可用转换器的信息,请参见 SimpleMessageConverter

从 1.4.2 版本开始,原始消息具有 consumerQueueconsumerTag 属性,可用于确定接收消息的队列。

从 1.5 版本开始,您可以配置一个消费者队列或标签到方法名的映射,以动态选择要调用的方法。如果映射中没有条目,我们将回退到默认的监听器方法。默认的监听器方法(如果未设置)是 handleMessage

从 2.0 版本开始,提供了一个方便的 FunctionalInterface。以下清单显示了 FunctionalInterface 的定义

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

    R handleMessage(T t);

}

此接口使用 Java 8 lambda 表达式简化了适配器的配置,如下面的示例所示

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
    ...
    return result;
}));

从 2.2 版本开始,buildListenerArguments(Object) 已被弃用,取而代之的是新的 buildListenerArguments(Object, Channel, Message)。新方法帮助监听器获取 ChannelMessage 参数以执行更多操作,例如在手动确认模式下调用 channel.basicReject(long, boolean)。以下清单显示了最基本的示例

public class ExtendedListenerAdapter extends MessageListenerAdapter {

    @Override
    protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
        return new Object[]{extractedMessage, channel, message};
    }

}

现在,如果您需要接收“channel”和“message”,您可以像 MessageListenerAdapter 一样配置 ExtendedListenerAdapter。监听器的参数应设置为 buildListenerArguments(Object, Channel, Message) 返回的值,如下面的监听器示例所示

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
    ...
}

容器

现在您已经了解了 Message 监听回调的各种选项,我们可以将注意力转向容器。基本上,容器处理“主动”职责,以便监听器回调可以保持被动。容器是“生命周期”组件的一个示例。它提供用于启动和停止的方法。在配置容器时,您实际上是在 AMQP 队列和 MessageListener 实例之间架起桥梁。您必须提供对 ConnectionFactory 和队列名称或队列实例的引用,监听器应该从这些队列或实例中消费消息。

在 2.0 版本之前,只有一个监听器容器,即 SimpleMessageListenerContainer。现在有第二个容器,即 DirectMessageListenerContainer。容器之间的区别以及在选择使用哪个容器时可能适用的标准在 选择容器 中进行了描述。

以下清单显示了最基本的示例,它通过使用 SimpleMessageListenerContainer 来工作

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));

作为“活动”组件,最常见的是使用 Bean 定义创建监听器容器,以便它可以在后台运行。以下示例展示了使用 XML 的一种方法。

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

以下清单展示了使用 XML 的另一种方法。

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>

以上两个示例都创建了一个 DirectMessageListenerContainer(注意 type 属性——它默认为 simple)。

或者,您可能更喜欢使用 Java 配置,它看起来类似于前面的代码片段。

@Configuration
public class ExampleAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

    @Bean
    public CachingConnectionFactory rabbitConnectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public MessageListener exampleListener() {
        return new MessageListener() {
            public void onMessage(Message message) {
                System.out.println("received: " + message);
            }
        };
    }
}

消费者优先级

从 RabbitMQ 3.2 版本开始,代理现在支持消费者优先级(参见 使用 RabbitMQ 的消费者优先级)。这可以通过在消费者上设置 x-priority 参数来实现。SimpleMessageListenerContainer 现在支持设置消费者参数,如下例所示。

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));

为了方便起见,命名空间在 listener 元素上提供了 priority 属性,如下例所示。

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
    <rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>

从 1.3 版本开始,您可以在运行时修改容器监听的队列。参见 监听器容器队列

auto-delete 队列

当容器配置为监听 auto-delete 队列时,队列具有 x-expires 选项,或者代理上配置了 生存时间 策略,当容器停止时(即最后一个消费者取消时),队列将被代理删除。在 1.3 版本之前,容器无法重新启动,因为队列丢失了。RabbitAdmin 仅在连接关闭或打开时自动重新声明队列等,这在容器停止和启动时不会发生。

从 1.3 版本开始,容器使用 RabbitAdmin 在启动期间重新声明任何丢失的队列。

您还可以使用条件声明(参见 条件声明)以及 auto-startup="false" 管理员来推迟队列声明,直到容器启动。以下示例展示了如何实现。

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="otherAnon" key="otherAnon" />
    </rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
    <rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
    auto-startup="false" />

在这种情况下,队列和交换机由 containerAdmin 声明,它具有 auto-startup="false",因此这些元素不会在上下文初始化期间声明。同样,容器也不会启动,原因相同。当容器稍后启动时,它将使用对 containerAdmin 的引用来声明这些元素。