请求/回复消息
AmqpTemplate
还提供各种 sendAndReceive
方法,这些方法接受前面针对单向发送操作描述的相同参数选项(exchange
、routingKey
和 Message
)。这些方法对于请求/回复场景非常有用,因为它们在发送之前处理必要的 reply-to
属性的配置,并且可以监听为此目的内部创建的独占队列上的回复消息。
类似的请求/回复方法也可用,其中 MessageConverter
应用于请求和回复。这些方法名为 convertSendAndReceive
。有关更多详细信息,请参见 AmqpTemplate
的Javadoc。
从 1.5.0 版本开始,每个 sendAndReceive
方法变体都有一个采用 CorrelationData
的重载版本。结合正确配置的连接工厂,这使得能够接收操作发送端的发布者确认。请参见 相关的发布者确认和返回 以及 RabbitOperations
的Javadoc 以了解更多信息。
从 2.0 版本开始,这些方法(convertSendAndReceiveAsType
)有一些变体,它们采用额外的 ParameterizedTypeReference
参数来转换复杂的返回类型。模板必须使用 SmartMessageConverter
进行配置。有关更多信息,请参见 使用 RabbitTemplate
从 Message
转换。
从 2.1 版本开始,您可以使用 noLocalReplyConsumer
选项配置 RabbitTemplate
来控制回复消费者的 noLocal
标志。默认情况下为 false
。
回复超时
默认情况下,发送和接收方法会在五秒钟后超时并返回 null。您可以通过设置 replyTimeout
属性来修改此行为。从 1.5 版本开始,如果您将 mandatory
属性设置为 true
(或者特定消息的 mandatory-expression
计算结果为 true
),如果消息无法传递到队列,则会抛出 AmqpMessageReturnedException
异常。此异常具有 returnedMessage
、replyCode
和 replyText
属性,以及用于发送的 exchange
和 routingKey
。
此功能使用发布者返回。您可以通过在 CachingConnectionFactory 上将 publisherReturns 设置为 true 来启用它(请参见 发布者确认和返回)。此外,您不得在 RabbitTemplate 中注册您自己的 ReturnCallback 。 |
从 2.1.2 版本开始,添加了一个 replyTimedOut
方法,允许子类知道超时,以便它们可以清理任何保留的状态。
从 2.0.11 和 2.1.3 版本开始,当您使用默认的 DirectReplyToMessageListenerContainer
时,您可以通过设置模板的 replyErrorHandler
属性来添加错误处理程序。此错误处理程序会针对任何失败的交付调用,例如延迟回复和在没有相关标头的情况下接收的消息。传入的异常是 ListenerExecutionFailedException
,它具有 failedMessage
属性。
RabbitMQ 直接回复
从 3.4.0 版本开始,RabbitMQ 服务器支持直接回复。这消除了固定回复队列的主要原因(避免为每个请求创建临时队列)。从 Spring AMQP 1.4.1 版本开始,默认情况下使用直接回复(如果服务器支持)而不是创建临时回复队列。如果没有提供replyQueue (或其名称设置为amq.rabbitmq.reply-to ),RabbitTemplate 会自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。使用直接回复时,不需要也不应该配置reply-listener 。 |
使用命名队列(除amq.rabbitmq.reply-to
外)仍然支持回复监听器,允许控制回复并发性等等。
从 1.6 版本开始,如果您希望为每个回复使用临时、独占、自动删除的队列,请将useTemporaryReplyQueues
属性设置为true
。如果您设置了replyAddress
,则忽略此属性。
您可以通过子类化RabbitTemplate
并覆盖useDirectReplyTo()
来检查不同的条件,从而更改决定是否使用直接回复的标准。此方法只在发送第一个请求时调用一次。
在 2.0 版本之前,RabbitTemplate
为每个请求创建一个新的消费者,并在收到回复(或超时)时取消消费者。现在,模板改为使用DirectReplyToMessageListenerContainer
,允许重用消费者。模板仍然负责关联回复,因此不会出现延迟回复发送到不同发送者的危险。如果您想恢复到之前的行为,请将useDirectReplyToContainer
(使用 XML 配置时为direct-reply-to-container
)属性设置为false。
AsyncRabbitTemplate
没有此选项。当使用直接回复时,它始终使用DirectReplyToContainer
来处理回复。
从 2.3.7 版本开始,模板新增了一个属性useChannelForCorrelation
。当此属性为true
时,服务器不必将相关 ID 从请求消息头复制到回复消息。而是使用用于发送请求的通道将回复与请求相关联。
使用回复队列的消息关联
使用固定回复队列(除amq.rabbitmq.reply-to
外)时,必须提供相关数据以便将回复与请求关联。请参见RabbitMQ 远程过程调用 (RPC)。默认情况下,标准correlationId
属性用于保存相关数据。但是,如果您希望使用自定义属性来保存相关数据,可以在<rabbit-template/>
上设置correlation-key
属性。显式地将属性设置为correlationId
与省略属性相同。客户端和服务器必须使用相同的标头作为相关数据。
Spring AMQP 1.1 版本使用名为spring_reply_correlation 的自定义属性来保存此数据。如果您希望使用当前版本恢复此行为(也许是为了与使用 1.1 版本的另一个应用程序保持兼容性),则必须将属性设置为spring_reply_correlation 。 |
默认情况下,模板会生成自己的相关 ID(忽略任何用户提供的值)。如果您希望使用自己的相关 ID,请将RabbitTemplate
实例的userCorrelationId
属性设置为true
。
相关 ID 必须唯一,以避免可能将错误的回复返回给请求。 |
回复监听器容器
使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。但是,可以在模板上配置单个回复队列,这可能更高效,还可以让您在该队列上设置参数。但是,在这种情况下,您还必须提供一个<reply-listener/>
子元素。此元素为回复队列提供一个监听器容器,模板作为监听器。<listener-container/>
上允许的所有消息监听器容器配置属性都允许在此元素上使用,但connection-factory
和message-converter
除外,它们继承自模板的配置。
如果您运行多个应用程序实例或使用多个RabbitTemplate 实例,则**必须**为每个实例使用唯一的回复队列。RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,则每个实例都会争夺回复,并且不一定能收到自己的回复。 |
以下示例定义了一个带有连接工厂的 rabbit 模板
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们不共享通道。因此,请求和回复不会在同一事务中执行(如果为事务性)。
在 1.5.0 版本之前,reply-address 属性不可用。回复始终通过使用默认交换机和reply-queue 名称作为路由键来路由。这仍然是默认设置,但您现在可以指定新的reply-address 属性。reply-address 可以包含具有<exchange>/<routingKey> 形式的地址,并且回复将路由到指定的交换机并路由到与路由键绑定的队列。reply-address 优先于reply-queue 。仅使用reply-address 时,必须将<reply-listener> 配置为单独的<listener-container> 组件。reply-address 和reply-queue (或<listener-container> 上的queues 属性)必须在逻辑上引用相同的队列。 |
通过此配置,使用SimpleListenerContainer
接收回复,RabbitTemplate
作为MessageListener
。如前例所示,当使用<rabbit:template/>
命名空间元素定义模板时,解析器会定义容器并将模板作为监听器连接。
当模板不使用固定的replyQueue (或使用直接回复——参见RabbitMQ 直接回复)时,不需要监听器容器。当使用 RabbitMQ 3.4.0 或更高版本时,直接回复是首选机制。 |
如果您将RabbitTemplate
定义为<bean/>
,或使用@Configuration
类将其定义为@Bean
,或者以编程方式创建模板,则需要自己定义和连接回复监听器容器。如果您未能执行此操作,则模板永远不会收到回复,最终会超时并返回null作为对sendAndReceive
方法调用的回复。
从 1.5 版本开始,RabbitTemplate
会检测它是否已配置为MessageListener
以接收回复。如果不是,则尝试使用回复地址发送和接收消息将失败,并出现IllegalStateException
(因为永远不会收到回复)。
此外,如果使用简单的replyAddress
(队列名称),则回复监听器容器会验证它是否正在监听具有相同名称的队列。如果回复地址是交换机和路由键,则无法执行此检查,并且会写入调试日志消息。
当您自己连接回复监听器和模板时,务必确保模板的replyAddress 和容器的queues (或queueNames )属性引用相同的队列。模板将回复地址插入到出站消息replyTo 属性中。 |
以下列表显示了如何手动连接 bean 的示例
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
在此测试用例中显示了使用固定回复队列连接RabbitTemplate
以及处理请求并返回回复的“远程”监听器容器的完整示例。
当回复超时(replyTimeout )时,sendAndReceive() 方法返回null。 |
在 1.3.6 版本之前,仅记录超时消息的延迟回复。现在,如果收到延迟回复,则会将其拒绝(模板会抛出AmqpRejectAndDontRequeueException
)。如果回复队列配置为将拒绝的消息发送到死信交换机,则可以检索回复以供以后分析。为此,请将队列绑定到配置的死信交换机,路由键等于回复队列的名称。
有关配置死信的更多信息,请参见RabbitMQ 死信文档。您还可以查看FixedReplyQueueDeadLetterTests
测试用例以了解示例。
异步 Rabbit 模板
1.6 版本引入了AsyncRabbitTemplate
。它具有与AmqpTemplate
上的sendAndReceive
(和convertSendAndReceive
)方法类似的方法。但是,它们不会阻塞,而是返回CompletableFuture
。
sendAndReceive
方法返回RabbitMessageFuture
。convertSendAndReceive
方法返回RabbitConverterFuture
。
您可以稍后同步检索结果,方法是调用future上的get()
,或者您可以注册一个回调,该回调将使用结果异步调用。以下列表显示了这两种方法
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果设置了mandatory
并且无法传递消息,则future将抛出ExecutionException
,其原因是AmqpMessageReturnedException
,它封装了返回的消息和有关返回的信息。
如果设置了enableConfirms
,则future将具有一个名为confirm
的属性,它本身是一个CompletableFuture<Boolean>
,其中true
表示发布成功。如果confirm future为false
,则RabbitFuture
将具有另一个名为nackCause
的属性,其中包含失败的原因(如果可用)。
如果在收到回复后收到发布确认,则会丢弃发布确认,因为回复意味着发布成功。 |
您可以设置模板上的receiveTimeout
属性以使回复超时(默认为30000
- 30 秒)。如果发生超时,则future将完成AmqpReplyTimeoutException
。
模板实现SmartLifecycle
。在有未完成的回复时停止模板会导致取消未完成的Future
实例。
从 2.0 版本开始,异步模板现在支持直接回复,而不是配置的回复队列。要启用此功能,请使用以下构造函数之一
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参见RabbitMQ 直接回复,了解如何在同步RabbitTemplate
中使用直接回复。
2.0 版本引入了这些方法的变体 (convertSendAndReceiveAsType
),它们接受一个额外的 ParameterizedTypeReference
参数来转换复杂的返回类型。您必须使用 SmartMessageConverter
配置底层的 RabbitTemplate
。更多信息,请参见 使用 RabbitTemplate
从 Message
中转换。
从 3.0 版本开始,AsyncRabbitTemplate 方法现在返回 CompletableFuture 而不是 ListenableFuture 。 |