异步 @KafkaListener 返回类型

从 3.2 版本开始,@KafkaListener(和 @KafkaHandler)方法可以使用异步返回类型来指定,从而允许异步发送回复。返回类型包括 CompletableFuture<?>Mono<?> 和 Kotlin 的 suspend 函数。

@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
    ...
    CompletableFuture<String> future = new CompletableFuture<>();
    future.complete("done");
    return future;
}
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
    ...
    return Mono.empty();
}
当检测到异步返回类型时,AckMode 将自动设置为 MANUAL 并启用无序提交;相反,异步完成将在异步操作完成后进行确认。当异步结果以错误完成时,消息是否恢复取决于容器错误处理程序。如果监听器方法中发生某些异常阻止创建异步结果对象,则**必须**捕获该异常并返回适当的返回对象,这将导致消息被确认或恢复。

如果在具有异步返回类型的监听器(包括 Kotlin 暂停函数)上配置了 KafkaListenerErrorHandler,则在发生故障后会调用错误处理程序。有关此错误处理程序及其用途的更多信息,请参阅处理异常