使用 @SendTo 转发监听器结果
从版本 2.0 开始,如果您还将一个 @KafkaListener 与 @SendTo 注解一起使用,并且方法调用返回一个结果,则该结果将转发到 @SendTo 指定的主题。
@SendTo 的值可以有几种形式
-
@SendTo("someTopic")路由到字面主题。 -
@SendTo("#{someExpression}")路由到通过在应用程序上下文初始化期间评估表达式一次确定的主题。 -
@SendTo("!{someExpression}")路由到通过在运行时评估表达式确定的主题。评估的#root对象有三个属性-
request: 入站ConsumerRecord(或批处理侦听器的ConsumerRecords对象)。 -
source: 从request转换而来的org.springframework.messaging.Message<?>。 -
result: 方法返回结果。
-
-
@SendTo(无属性):这被视为!{source.headers['kafka_replyTopic']}(从版本 2.1.3 开始)。
从版本 2.1.11 和 2.2.1 开始,属性占位符在 @SendTo 值中解析。
表达式评估的结果必须是一个表示主题名称的 String。以下示例展示了使用 @SendTo 的各种方式
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
为了支持 @SendTo,必须向侦听器容器工厂提供一个 KafkaTemplate(在其 replyTemplate 属性中),用于发送回复。这应该是一个 KafkaTemplate,而不是用于客户端请求/回复处理的 ReplyingKafkaTemplate。当使用 Spring Boot 时,它会自动将模板配置到工厂中;当配置您自己的工厂时,必须按以下示例所示进行设置。 |
从版本 2.2 开始,您可以向侦听器容器工厂添加一个 ReplyHeadersConfigurer。它用于确定您要在回复消息中设置哪些标头。以下示例展示了如何添加 ReplyHeadersConfigurer
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果您愿意,还可以添加更多标头。以下示例展示了如何实现
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当您使用 @SendTo 时,您必须配置 ConcurrentKafkaListenerContainerFactory 并在其 replyTemplate 属性中配置一个 KafkaTemplate 来执行发送。Spring Boot 将自动连接其自动配置的模板(或任何单个实例存在的模板)。
除非您使用 请求/回复语义,否则只使用简单的 send(topic, value) 方法,因此您可能希望创建一个子类来生成分区或键。以下示例展示了如何实现 |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<String, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
|
如果侦听器方法返回
|
当使用请求/回复语义时,目标分区可以由发送方请求。
|
您可以为
有关更多信息,请参阅 处理异常。 |
如果侦听器方法返回一个 Iterable,默认情况下,每个元素作为值发送一个记录。从版本 2.3.5 开始,将 @KafkaListener 上的 splitIterables 属性设置为 false,整个结果将作为单个 ProducerRecord 的值发送。这需要在回复模板的生产者配置中有一个合适的序列化器。但是,如果回复是 Iterable<Message<?>>,则该属性将被忽略,并且每个消息都将单独发送。 |