轮询消费者

AmqpTemplate 本身可用于轮询接收消息。默认情况下,如果没有可用的消息,则会立即返回 null。不会阻塞。从 1.5 版本开始,您可以设置一个以毫秒为单位的 receiveTimeout,接收方法最多阻塞这么长时间,等待消息。小于零的值表示无限期阻塞(或至少阻塞到与代理的连接丢失为止)。1.6 版本引入了 receive 方法的变体,允许在每次调用时传入超时时间。

由于接收操作为每条消息创建一个新的 QueueingConsumer,因此此技术并不真正适用于高容量环境。对于这些用例,请考虑使用异步消费者或 receiveTimeout 为零。

从 2.4.8 版本开始,当使用非零超时时,您可以指定传递到用于将消费者与通道关联的 basicConsume 方法中的参数。例如:template.addConsumerArg("x-priority", 10)

有四种简单的 receive 方法可用。与发送端的 Exchange 一样,有一种方法要求直接在模板本身上设置默认队列属性,还有一种方法在运行时接受队列参数。1.6 版本引入了变体以接受 timeoutMillis 以按每次请求覆盖 receiveTimeout。以下清单显示了这四种方法的定义

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

与发送消息的情况一样,AmqpTemplate 有一些方便的方法用于接收 POJO 而不是 Message 实例,并且实现提供了一种自定义用于创建返回 ObjectMessageConverter 的方法:以下清单显示了这些方法

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

从 2.0 版本开始,这些方法的变体接受额外的 ParameterizedTypeReference 参数以转换复杂类型。模板必须配置 SmartMessageConverter。有关更多信息,请参见 使用 RabbitTemplate 从 Message 转换

类似于 sendAndReceive 方法,从 1.3 版本开始,AmqpTemplate 有一些方便的 receiveAndReply 方法用于同步接收、处理和回复消息。以下清单显示了这些方法定义

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
       throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
     throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
     ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
            ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

AmqpTemplate 实现负责 receivereply 阶段。在大多数情况下,您只需要提供 ReceiveAndReplyCallback 的实现即可对接收到的消息执行一些业务逻辑,并在需要时构建回复对象或消息。请注意,ReceiveAndReplyCallback 可以返回 null。在这种情况下,不会发送任何回复,并且 receiveAndReply 的工作方式与 receive 方法相同。这允许将同一个队列用于混合消息,其中一些消息可能不需要回复。

只有在提供的回调不是ReceiveAndReplyMessageCallback的实例(该实例提供原始消息交换契约)时,才会应用自动消息(请求和回复)转换。

ReplyToAddressCallback适用于需要自定义逻辑来根据接收到的消息和来自ReceiveAndReplyCallback的回复在运行时确定replyTo地址的情况。默认情况下,使用请求消息中的replyTo信息来路由回复。

以下列表显示了一个基于POJO的接收和回复示例。

boolean received =
        this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {

                public Invoice handle(Order order) {
                        return processOrder(order);
                }
        });
if (received) {
        log.info("We received an order!");
}