Spring 管理的生产者拦截器

从 3.0.0 版本开始,对于生产者拦截器,您可以直接将其作为 Bean 由 Spring 管理,而不是向 Apache Kafka 生产者配置提供拦截器的类名。如果您采用这种方法,则需要在 KafkaTemplate 上设置此生产者拦截器。以下是一个使用上面相同的 MyProducerInterceptor 的示例,但已更改为不使用内部配置属性。

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

    private final SomeBean bean;

    public MyProducerInterceptor(SomeBean bean) {
        this.bean = bean;
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        this.bean.someMethod("producer interceptor");
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
  return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
   KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
   kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}

在记录发送之前,会调用生产者拦截器的 onSend 方法。服务器在发布数据后发送确认后,就会调用 onAcknowledgement 方法。onAcknowledgement 方法在生产者调用任何用户回调之前调用。

如果您有多个需要应用于 KafkaTemplate 的通过 Spring 管理的此类生产者拦截器,则需要使用 CompositeProducerInterceptorCompositeProducerInterceptor 允许按顺序添加各个生产者拦截器。底层 ProducerInterceptor 实现中的方法将按添加到 CompositeProducerInterceptor 的顺序调用。