请求/回复消息
AmqpTemplate
还提供各种 sendAndReceive
方法,这些方法接受与之前描述的单向发送操作(exchange
、routingKey
和 Message
)相同的参数选项。这些方法对于请求/回复场景非常有用,因为它们在发送之前处理必要的 reply-to
属性的配置,并且可以在为此目的在内部创建的独占队列上监听回复消息。
类似的请求-回复方法也可用,其中MessageConverter
应用于请求和回复。这些方法名为convertSendAndReceive
。有关更多详细信息,请参见AmqpTemplate
的 Javadoc。
从 1.5.0 版本开始,每个sendAndReceive
方法变体都有一个重载版本,它接受CorrelationData
。与正确配置的连接工厂一起,这使得能够接收发送端操作的发布者确认。有关更多信息,请参见相关发布者确认和返回以及RabbitOperations
的 Javadoc。
从 2.0 版本开始,这些方法(convertSendAndReceiveAsType
)有一些变体,它们接受一个额外的ParameterizedTypeReference
参数来转换复杂的返回类型。模板必须使用SmartMessageConverter
进行配置。有关更多信息,请参见使用 RabbitTemplate
从 Message
中转换。
从 2.1 版本开始,您可以使用noLocalReplyConsumer
选项配置RabbitTemplate
来控制回复消费者的noLocal
标志。默认情况下,此选项为false
。
回复超时
默认情况下,发送和接收方法在五秒后超时并返回 null。您可以通过设置replyTimeout
属性来修改此行为。从 1.5 版本开始,如果您将mandatory
属性设置为true
(或mandatory-expression
对特定消息评估为true
),如果消息无法传递到队列,则会抛出AmqpMessageReturnedException
。此异常具有returnedMessage
、replyCode
和replyText
属性,以及用于发送的exchange
和routingKey
。
此功能使用发布者返回。您可以通过在CachingConnectionFactory 上将publisherReturns 设置为true 来启用它(请参见发布者确认和返回)。此外,您不得在RabbitTemplate 中注册您自己的ReturnCallback 。
|
从 2.1.2 版本开始,添加了replyTimedOut
方法,让子类可以了解超时,以便它们可以清理任何保留的状态。
从 2.0.11 和 2.1.3 版本开始,当您使用默认的 DirectReplyToMessageListenerContainer
时,可以通过设置模板的 replyErrorHandler
属性添加错误处理程序。此错误处理程序会针对任何失败的传递调用,例如延迟回复和未包含关联头的消息。传入的异常是 ListenerExecutionFailedException
,它具有 failedMessage
属性。
RabbitMQ 直接回复
从 3.4.0 版本开始,RabbitMQ 服务器支持 直接回复。这消除了固定回复队列的主要原因(避免为每个请求创建临时队列)。从 Spring AMQP 1.4.1 版本开始,默认情况下使用直接回复(如果服务器支持),而不是创建临时回复队列。当未提供 replyQueue (或将其设置为 amq.rabbitmq.reply-to 的名称)时,RabbitTemplate 会自动检测是否支持直接回复,并使用它或回退到使用临时回复队列。使用直接回复时,不需要 reply-listener ,也不应配置它。
|
命名队列(除了 amq.rabbitmq.reply-to
)仍然支持回复监听器,允许控制回复并发等。
从 1.6 版本开始,如果您希望为每个回复使用临时、排他、自动删除的队列,请将 useTemporaryReplyQueues
属性设置为 true
。如果您设置了 replyAddress
,则会忽略此属性。
您可以通过子类化 RabbitTemplate
并覆盖 useDirectReplyTo()
来更改决定是否使用直接回复的标准,以检查不同的标准。该方法只调用一次,即在发送第一个请求时。
在 2.0 版本之前,RabbitTemplate
为每个请求创建一个新的消费者,并在收到回复(或超时)时取消消费者。现在,模板使用 DirectReplyToMessageListenerContainer
,让消费者可以重复使用。模板仍然负责关联回复,因此不会出现延迟回复发送到不同发送者的风险。如果您想恢复到之前的行为,请将 useDirectReplyToContainer
(在使用 XML 配置时为 direct-reply-to-container
)属性设置为 false。
AsyncRabbitTemplate
没有这样的选项。当使用直接回复时,它始终使用 DirectReplyToContainer
来回复。
从 2.3.7 版本开始,模板新增了一个属性 useChannelForCorrelation
。当该属性设置为 true
时,服务器无需将请求消息头中的关联 ID 复制到回复消息中。相反,服务器会使用发送请求的通道来关联回复和请求。
使用回复队列进行消息关联
当使用固定回复队列(非 amq.rabbitmq.reply-to
)时,您必须提供关联数据,以便将回复与请求关联起来。请参阅 RabbitMQ 远程过程调用 (RPC)。默认情况下,标准 correlationId
属性用于保存关联数据。但是,如果您希望使用自定义属性来保存关联数据,可以在 <rabbit-template/> 上设置 correlation-key
属性。显式将属性设置为 correlationId
与省略该属性相同。客户端和服务器必须使用相同的头来保存关联数据。
Spring AMQP 1.1 版本使用了一个名为 spring_reply_correlation 的自定义属性来保存此数据。如果您希望在当前版本中恢复此行为(可能是为了与使用 1.1 版本的另一个应用程序保持兼容),则必须将属性设置为 spring_reply_correlation 。
|
默认情况下,模板会生成自己的关联 ID(忽略任何用户提供的关联 ID)。如果您希望使用自己的关联 ID,请将 RabbitTemplate
实例的 userCorrelationId
属性设置为 true
。
关联 ID 必须是唯一的,以避免出现错误的回复被返回给请求的情况。 |
回复监听器容器
当使用 3.4.0 之前的 RabbitMQ 版本时,每个回复都会使用一个新的临时队列。但是,可以在模板上配置一个单一的回复队列,这可以提高效率,并且还可以让您在该队列上设置参数。但是,在这种情况下,您还必须提供一个 <reply-listener/> 子元素。此元素为回复队列提供了一个监听器容器,其中模板充当监听器。所有在 <listener-container/> 上允许的 消息监听器容器配置 属性都可以在该元素上使用,除了 connection-factory
和 message-converter
,这两个属性是从模板的配置中继承的。
如果您运行应用程序的多个实例或使用多个 RabbitTemplate 实例,则 **必须** 为每个实例使用一个唯一的回复队列。RabbitMQ 无法从队列中选择消息,因此,如果所有实例都使用同一个队列,则每个实例都会争夺回复,并且不一定能收到自己的回复。
|
以下示例定义了一个带有连接工厂的 Rabbit 模板
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们不共享一个通道。因此,请求和回复不会在同一个事务中执行(如果使用事务)。
在 1.5.0 版本之前,reply-address 属性不可用。回复始终通过使用默认交换机和 reply-queue 名称作为路由键来路由。这仍然是默认设置,但您现在可以指定新的 reply-address 属性。reply-address 可以包含形式为 <exchange>/<routingKey> 的地址,回复将路由到指定的交换机并路由到与路由键绑定的队列。reply-address 优先于 reply-queue 。当仅使用 reply-address 时,<reply-listener> 必须配置为单独的 <listener-container> 组件。reply-address 和 reply-queue (或 <listener-container> 上的 queues 属性)必须在逻辑上引用同一个队列。
|
使用此配置,SimpleListenerContainer
用于接收回复,RabbitTemplate
作为 MessageListener
。当使用 <rabbit:template/>
命名空间元素定义模板时,如前面的示例所示,解析器定义容器并将模板作为监听器连接起来。
当模板不使用固定的 replyQueue (或使用直接回复 - 请参阅 RabbitMQ 直接回复)时,不需要监听器容器。使用 RabbitMQ 3.4.0 或更高版本时,直接 reply-to 是首选机制。
|
如果您将 RabbitTemplate
定义为 <bean/>
或使用 @Configuration
类将其定义为 @Bean
,或者当您以编程方式创建模板时,您需要自己定义和连接回复监听器容器。如果您没有这样做,模板将永远不会收到回复,最终会超时并返回 null 作为对 sendAndReceive
方法调用的回复。
从 1.5 版本开始,RabbitTemplate
检测它是否已配置为 MessageListener
来接收回复。如果没有,尝试使用回复地址发送和接收消息将失败,并出现 IllegalStateException
(因为永远不会收到回复)。
此外,如果使用简单的 replyAddress
(队列名称),回复监听器容器将验证它是否正在监听具有相同名称的队列。如果回复地址是交换机和路由键,则无法执行此检查,并且会写入调试日志消息。
当您自己连接回复监听器和模板时,务必确保模板的 replyAddress 和容器的 queues (或 queueNames )属性引用同一个队列。模板将回复地址插入到出站消息的 replyTo 属性中。
|
以下列表显示了如何手动连接 bean 的示例
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
RabbitTemplate
与固定回复队列一起连接的完整示例,以及处理请求并返回回复的“远程”监听器容器,如 此测试用例 所示。
当回复超时(replyTimeout )时,sendAndReceive() 方法将返回 null。
|
在 1.3.6 版本之前,超时消息的延迟回复只会记录。现在,如果收到延迟回复,它将被拒绝(模板会抛出 AmqpRejectAndDontRequeueException
)。如果回复队列配置为将拒绝的消息发送到死信交换机,则可以检索回复以供以后分析。为此,请将队列绑定到配置的死信交换机,并使用与回复队列名称相同的路由键。
有关配置死信的更多信息,请参阅 RabbitMQ 死信文档。您还可以查看 FixedReplyQueueDeadLetterTests
测试用例以获取示例。
异步 Rabbit 模板
1.6 版本引入了 AsyncRabbitTemplate
。它具有与 AmqpTemplate
上的类似 sendAndReceive
(和 convertSendAndReceive
)方法。但是,它不会阻塞,而是返回一个 CompletableFuture
。
sendAndReceive
方法返回一个 RabbitMessageFuture
。convertSendAndReceive
方法返回一个 RabbitConverterFuture
。
您可以通过在将来调用 get()
来同步检索结果,或者您可以注册一个异步调用结果的回调。以下清单显示了两种方法
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果设置了 mandatory
并且消息无法传递,则将来会抛出 ExecutionException
,其原因是 AmqpMessageReturnedException
,它封装了返回的消息和有关返回的信息。
如果设置了 enableConfirms
,则将来有一个名为 confirm
的属性,它本身是一个 CompletableFuture<Boolean>
,其中 true
表示发布成功。如果确认将来为 false
,则 RabbitFuture
还有一个名为 nackCause
的属性,其中包含失败的原因(如果可用)。
如果在回复之后收到发布者确认,则发布者确认将被丢弃,因为回复意味着发布成功。 |
您可以在模板上设置 receiveTimeout
属性以使回复超时(默认值为 30000
- 30 秒)。如果发生超时,将来将使用 AmqpReplyTimeoutException
完成。
模板实现了 SmartLifecycle
。在有待处理的回复时停止模板会导致待处理的 Future
实例被取消。
从 2.0 版本开始,异步模板现在支持 直接回复,而不是配置的回复队列。要启用此功能,请使用以下构造函数之一
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参阅 RabbitMQ 直接回复,了解如何使用同步 RabbitTemplate
使用直接回复。
2.0 版本引入了这些方法的变体(convertSendAndReceiveAsType
),这些变体采用额外的 ParameterizedTypeReference
参数来转换复杂的返回类型。您必须使用 SmartMessageConverter
配置底层的 RabbitTemplate
。有关更多信息,请参阅 使用 RabbitTemplate
从 Message
转换。
从 3.0 版本开始,AsyncRabbitTemplate 方法现在返回 CompletableFuture 而不是 ListenableFuture 。
|