类上的 @KafkaListener
在类级别使用 @KafkaListener
时,必须在方法级别指定 @KafkaHandler
。传递消息时,转换后的消息有效负载类型用于确定要调用的方法。以下示例显示了如何执行此操作
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从 2.1.3 版本开始,你可以将 @KafkaHandler
方法指定为默认方法,如果其他方法没有匹配项,则调用该方法。最多只能指定一个方法。使用 @KafkaHandler
方法时,有效负载必须已经转换为域对象(以便可以执行匹配)。使用自定义反序列化程序、JsonDeserializer
或将 TypePrecedence
设置为 TYPE_ID
的 JsonMessageConverter
。有关更多信息,请参阅 序列化、反序列化和消息转换。
由于 Spring 解析方法参数的方式存在一些限制,默认 @KafkaHandler 无法接收离散标头;它必须使用 使用者记录元数据中讨论的 ConsumerRecordMetadata 。
|
例如
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是 String
,则此方法不起作用;topic
参数也将获得对 object
的引用。
如果需要有关默认方法中记录的元数据,请使用此方法
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}