异步消费者
Spring AMQP 还通过使用 `@RabbitListener` 注解支持注解监听器端点,并提供了一个开放的基础架构来以编程方式注册端点。这是迄今为止设置异步消费者的最便捷方式。有关更多详细信息,请参阅 注解驱动的监听器端点。 |
以前使用的预取默认值为 1,这可能导致高效消费者的利用率不足。从 2.0 版本开始,默认预取值现在为 250,这应该能够在大多数常见场景中使消费者保持忙碌状态,从而提高吞吐量。 然而,在某些情况下,预取值应该较低
此外,对于低容量消息和多个消费者(包括单个监听器容器实例内的并发),您可能希望减少预取以获得更均匀的消息跨消费者分布。 请参阅 消息监听器容器配置。 有关预取的更多背景信息,请参阅这篇关于 RabbitMQ 中消费者利用率 的文章以及这篇关于 排队理论 的文章。 |
消息监听器
对于异步 `Message` 接收,会涉及一个专用组件(不是 `AmqpTemplate`)。该组件是 `Message` 消费回调的容器。我们稍后在本节中讨论容器及其属性。但是,首先,我们应该查看回调,因为这是您的应用程序代码与消息系统集成的部分。回调有几种选择,首先是 `MessageListener` 接口的实现,如下所示
public interface MessageListener {
void onMessage(Message message);
}
如果您的回调逻辑出于任何原因依赖于 AMQP Channel 实例,则可以使用 `ChannelAwareMessageListener`。它看起来类似,但有一个额外的参数。以下清单显示了 `ChannelAwareMessageListener` 接口定义
public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
在 2.1 版本中,此接口已从 `o.s.amqp.rabbit.core` 包移动到 `o.s.amqp.rabbit.listener.api` 包。 |
MessageListenerAdapter
如果您更喜欢在应用程序逻辑和消息传递 API 之间保持更严格的分离,则可以依赖框架提供的适配器实现。这通常称为“消息驱动 POJO”支持。
1.5 版本引入了一种更灵活的 POJO 消息机制,即 `@RabbitListener` 注解。有关更多信息,请参阅 注解驱动的监听器端点。 |
使用适配器时,只需提供适配器本身应调用的实例的引用即可。以下示例演示了如何操作。
MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
您可以子类化适配器并提供getListenerMethodName()
的实现,以便根据消息动态选择不同的方法。此方法有两个参数,originalMessage
和extractedMessage
,后者是任何转换的结果。默认情况下,配置了SimpleMessageConverter
。有关更多信息以及其他可用转换器的信息,请参阅SimpleMessageConverter
。
从1.4.2版开始,原始消息具有consumerQueue
和consumerTag
属性,可用于确定从中接收消息的队列。
从1.5版开始,您可以配置一个消费者队列或标签到方法名的映射,以动态选择要调用的方法。如果映射中没有条目,我们将回退到默认侦听器方法。默认侦听器方法(如果未设置)是handleMessage
。
从2.0版开始,提供了一个方便的FunctionalInterface
。以下清单显示了FunctionalInterface
的定义。
@FunctionalInterface
public interface ReplyingMessageListener<T, R> {
R handleMessage(T t);
}
此接口使用Java 8 lambda表达式方便地配置适配器,如下例所示。
new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
从2.2版开始,buildListenerArguments(Object)
已弃用,并引入了新的buildListenerArguments(Object, Channel, Message)
。新方法帮助侦听器获取Channel
和Message
参数以执行更多操作,例如在手动确认模式下调用channel.basicReject(long, boolean)
。以下清单显示了最基本的示例。
public class ExtendedListenerAdapter extends MessageListenerAdapter {
@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}
}
现在,如果您需要接收“channel”和“message”,您可以像MessageListenerAdapter
一样配置ExtendedListenerAdapter
。侦听器的参数应设置为buildListenerArguments(Object, Channel, Message)
返回的值,如下例所示。
public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
容器
既然您已经了解了Message
侦听回调的各种选项,我们可以将注意力转向容器。基本上,容器处理“主动”职责,以便侦听回调可以保持被动。容器是“生命周期”组件的一个示例。它提供启动和停止的方法。配置容器时,您实际上是在AMQP队列和MessageListener
实例之间架起桥梁。您必须提供对ConnectionFactory
的引用以及侦听器应从中使用消息的队列名称或队列实例。
在2.0版之前,只有一个侦听器容器,即SimpleMessageListenerContainer
。现在还有一个第二个容器,即DirectMessageListenerContainer
。容器之间的区别以及选择使用哪个容器时可能应用的标准在选择容器中进行了描述。
以下清单显示了最基本的示例,该示例通过使用SimpleMessageListenerContainer
工作。
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
作为一个“主动”组件,最常见的是使用bean定义创建侦听器容器,以便它可以在后台运行。以下示例显示了一种使用XML进行操作的方法。
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以下清单显示了使用XML的另一种方法。
<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
以上两个示例都创建了一个DirectMessageListenerContainer
(注意type
属性——它默认为simple
)。
或者,您可能更喜欢使用Java配置,它看起来类似于前面的代码片段。
@Configuration
public class ExampleAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
消费者优先级
从RabbitMQ 3.2版开始,代理现在支持消费者优先级(请参阅使用RabbitMQ的消费者优先级)。这可以通过在消费者上设置x-priority
参数来启用。SimpleMessageListenerContainer
现在支持设置消费者参数,如下例所示。
container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
为了方便起见,命名空间在listener
元素上提供了priority
属性,如下例所示。
<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
从1.3版开始,您可以修改容器在运行时侦听的队列。请参阅侦听器容器队列。
auto-delete
队列
当容器配置为侦听auto-delete
队列时,队列具有x-expires
选项,或者在代理上配置了生存时间策略,则在容器停止时(即,当最后一个消费者取消时),队列将被代理删除。在1.3版之前,由于队列丢失,容器无法重新启动。RabbitAdmin
仅在连接关闭或打开时自动重新声明队列等,这在容器停止和启动时不会发生。
从1.3版开始,容器使用RabbitAdmin
在启动期间重新声明任何丢失的队列。
您还可以将条件声明(请参阅条件声明)与auto-startup="false"
admin一起使用,以将队列声明延迟到容器启动为止。以下示例演示了如何操作。
<rabbit:queue id="otherAnon" declared-by="containerAdmin" />
<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>
<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
在这种情况下,队列和交换机由containerAdmin
声明,containerAdmin
具有auto-startup="false"
,因此在上下文初始化期间不会声明这些元素。同样,出于相同原因,容器也不会启动。当容器稍后启动时,它使用它对containerAdmin
的引用来声明这些元素。