使用 @SendTo 转发侦听器结果

从 2.0 版本开始,如果你还用 @SendTo 注释了一个 @KafkaListener,并且方法调用返回了一个结果,那么该结果将被转发到由 @SendTo 指定的主题。

@SendTo 值可以有几种形式

  • @SendTo("someTopic") 路由到文字主题。

  • @SendTo("#{someExpression}") 路由到在应用程序上下文初始化期间评估表达式一次确定的主题。

  • @SendTo("!{someExpression}") 路由到在运行时评估表达式确定的主题。评估的 #root 对象有三个属性

    • request:入站 ConsumerRecord(或批量侦听器的 ConsumerRecords 对象)。

    • source:从 request 转换的 org.springframework.messaging.Message<?>

    • result:方法返回结果。

  • @SendTo(无属性):这被视为 !{source.headers['kafka_replyTopic']}(自版本 2.1.3 起)。

从版本 2.1.11 和 2.2.1 开始,属性占位符在 @SendTo 值中得到解析。

表达式评估的结果必须是表示主题名称的 String。以下示例显示了使用 @SendTo 的各种方式

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}
为了支持 @SendTo,必须向侦听器容器工厂提供 KafkaTemplate(在其 replyTemplate 属性中),该模板用于发送答复。这应该是 KafkaTemplate,而不是客户端用于请求/答复处理的 ReplyingKafkaTemplate。使用 Spring Boot 时,它会自动将模板配置到工厂中;配置自己的工厂时,必须按以下示例所示进行设置。

从版本 2.2 开始,可以向侦听器容器工厂添加 ReplyHeadersConfigurer。咨询此配置以确定要设置在答复消息中的标头。以下示例显示了如何添加 ReplyHeadersConfigurer

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

如果需要,还可以添加更多标头。以下示例显示了如何执行此操作

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

使用 @SendTo 时,必须使用其 replyTemplate 属性中的 KafkaTemplate 配置 ConcurrentKafkaListenerContainerFactory 以执行发送。Spring Boot 会自动连接其自动配置的模板(或在存在单个实例时连接任何模板)。

除非使用 请求/答复语义,否则仅使用简单的 send(topic, value) 方法,因此您可能希望创建一个子类来生成分区或键。以下示例显示了如何执行此操作
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory()) {

        @Override
        public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}

如果侦听器方法返回 Message<?>Collection<Message<?>>,则侦听器方法负责设置答复的消息标头。例如,在处理来自 ReplyingKafkaTemplate 的请求时,可以执行以下操作

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

使用请求/答复语义时,发件人可以请求目标分区。

即使没有返回结果,您也可以使用 @SendTo 注解 @KafkaListener 方法。这是为了允许配置 errorHandler,该配置可以将有关失败消息传递的信息转发到某个主题。以下示例演示如何执行此操作

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

有关详细信息,请参见 处理异常

如果侦听器方法返回 Iterable,则默认情况下,将发送一条记录,其中每个元素作为值。从版本 2.3.5 开始,将 @KafkaListener 上的 splitIterables 属性设置为 false,整个结果将作为单个 ProducerRecord 的值发送。这需要在回复模板的生产者配置中使用合适的序列化程序。但是,如果回复是 Iterable<Message<?>>,则将忽略该属性,并且每条消息将单独发送。