将 Spring Bean 连接到生产者/消费者拦截器
Apache Kafka 提供了一种机制,可以在生产者和消费者中添加拦截器。这些对象由 Kafka 管理,而不是 Spring,因此正常的 Spring 依赖注入不适用于连接依赖的 Spring Bean。但是,您可以使用拦截器 config()
方法手动连接这些依赖项。以下 Spring Boot 应用程序展示了如何通过覆盖 Spring Boot 的默认工厂来将一些依赖的 bean 添加到配置属性中。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
Map<String, Object> producerProperties = new HashMap<>();
// producerProperties.put(..., ...)
// ...
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("some.bean", someBean);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
return factory;
}
@Bean
public SomeBean someBean() {
return new SomeBean();
}
@KafkaListener(id = "kgk897", topics = "kgh897")
public void listen(String in) {
System.out.println("Received " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("kgh897", "test");
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kgh897")
.partitions(1)
.replicas(1)
.build();
}
}
public class SomeBean {
public void someMethod(String what) {
System.out.println(what + " in my foo bean");
}
}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
结果
producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test