回复管理
MessageListenerAdapter
中现有的支持已允许您的方法具有非空返回类型。在这种情况下,调用结果封装在发送到原始消息的 ReplyToAddress
标头中指定的目标地址或侦听器上配置的默认地址的消息中。您可以使用消息抽象的 @SendTo
注释设置该默认地址。
假设我们的 processOrder
方法现在应返回 OrderStatus
,我们可以按如下方式编写它以自动发送答复
@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果您需要以与传输无关的方式设置其他标头,则可以返回 Message
,如下所示
@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
或者,您可以在 beforeSendReplyMessagePostProcessors
容器工厂属性中使用 MessagePostProcessor
来添加更多标头。从版本 2.2.3 开始,已在答复消息中提供了被调用 bean/方法,它可用于消息后处理器中以将信息传达给调用者
factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
从版本 2.2.5 开始,您可以配置 ReplyPostProcessor
以在发送答复消息之前对其进行修改;在设置 correlationId
标头以匹配请求后调用它。
@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}
@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
从版本 3.0 开始,您可以在容器工厂上配置后处理器,而不是在注释上。
factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
id
参数是侦听器 ID。
注释上的设置将取代工厂设置。
@SendTo
值被假定为遵循 exchange/routingKey
模式的回复 exchange
和 routingKey
对,其中可以省略其中一部分。有效值如下
-
thing1/thing2
:replyTo
exchange 和routingKey
。thing1/
:replyTo
exchange 和默认(空)routingKey
。thing2
或/thing2
:replyTo
routingKey
和默认(空)exchange。/
或空:replyTo
默认 exchange 和默认routingKey
。
此外,您可以在没有 value
属性的情况下使用 @SendTo
。此情况等于空 sendTo
模式。仅当入站消息没有 replyToAddress
属性时才使用 @SendTo
。
从 1.5 版开始,@SendTo
值可以是 bean 初始化 SpEL 表达式,如下例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
该表达式必须计算为 String
,它可以是简单的队列名称(发送到默认 exchange)或具有 exchange/routingKey
形式,如在前面的示例中所述。
#{…} 表达式在初始化期间计算一次。
|
对于动态回复路由,消息发送方应包含 reply_to
消息属性或使用备用运行时 SpEL 表达式(在下一个示例之后描述)。
从 1.6 版开始,@SendTo
可以是针对请求和回复在运行时计算的 SpEL 表达式,如下例所示
@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
SpEL 表达式的运行时特性用 !{…}
定界符表示。表达式的评估上下文 #root
对象具有三个属性
-
request
:o.s.amqp.core.Message
请求对象。 -
source
:转换后的o.s.messaging.Message<?>
。 -
result
:方法结果。
上下文具有映射属性访问器、标准类型转换器和 bean 解析器,它允许引用上下文中的其他 bean(例如,@someBeanName.determineReplyQ(request, result)
)。
总之,#{…}
在初始化期间评估一次,其中#root
对象是应用程序上下文。Bean 通过其名称引用。!{…}
在运行时为每条消息评估,其中 root 对象具有前面列出的属性。Bean 通过其名称引用,前缀为@
。
从 2.1 版开始,还支持简单的属性占位符(例如,${some.reply.to}
)。对于早期版本,可以使用以下内容作为解决方法,如下例所示
@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}