类级别上的@KafkaListener

在类级别使用@KafkaListener 时,必须在方法级别指定@KafkaHandler。当消息被传递时,转换后的消息有效负载类型将用于确定要调用的方法。以下示例演示了如何操作。

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

从 2.1.3 版本开始,您可以指定一个@KafkaHandler 方法作为默认方法,如果与其他方法不匹配,则调用该方法。最多只能指定一个方法。使用@KafkaHandler 方法时,有效负载必须已转换为域对象(以便可以执行匹配)。使用自定义反序列化器、JsonDeserializer 或 JsonMessageConverter 并将其TypePrecedence设置为TYPE_ID。有关更多信息,请参阅序列化、反序列化和消息转换

由于 Spring 解析方法参数的方式存在一些限制,因此默认的@KafkaHandler 无法接收离散的标头;它必须使用消费者记录元数据中讨论的ConsumerRecordMetadata。

例如

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

如果对象是String,则此方法无效;topic参数也将获得对object的引用。

如果在默认方法中需要有关记录的元数据,请使用此方法

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}