快速入门
先决条件:您必须安装并运行 Apache Kafka。然后,您必须将 Spring for Apache Kafka(spring-kafka
)JAR 及其所有依赖项放在类路径上。最简单的方法是在构建工具中声明一个依赖项。
如果您未使用 Spring Boot,请在您的项目中将 spring-kafka
jar 声明为依赖项。
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.2.4</version>
</dependency>
compile 'org.springframework.kafka:spring-kafka:3.2.4'
使用 Spring Boot 时(并且您尚未使用 start.spring.io 创建项目),省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本 |
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
implementation 'org.springframework.kafka:spring-kafka'
但是,最快的入门方法是使用 start.spring.io(或 Spring Tool Suits 和 Intellij IDEA 中的向导)并创建一个项目,选择“Spring for Apache Kafka”作为依赖项。
入门
最简单的入门方法是使用 start.spring.io(或 Spring Tool Suits 和 Intellij IDEA 中的向导)并创建一个项目,选择“Spring for Apache Kafka”作为依赖项。有关其基础设施 Bean 的自配置的更多信息,请参阅 Spring Boot 文档。
这是一个最小的消费者应用程序。
Spring Boot 消费者应用程序
应用程序
-
Java
-
Kotlin
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}
}
@SpringBootApplication
class Application {
@Bean
fun topic() = NewTopic("topic1", 10, 1)
@KafkaListener(id = "myId", topics = ["topic1"])
fun listen(value: String?) {
println(value)
}
}
fun main(args: Array<String>) = runApplication<Application>(*args)
application.properties
spring.kafka.consumer.auto-offset-reset=earliest
NewTopic
Bean 会导致在代理上创建主题;如果主题已存在,则不需要它。
Spring Boot 生产者应用程序
应用程序
-
Java
-
Kotlin
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}
}
@SpringBootApplication
class Application {
@Bean
fun topic() = NewTopic("topic1", 10, 1)
@Bean
fun runner(template: KafkaTemplate<String?, String?>) =
ApplicationRunner { template.send("topic1", "test") }
companion object {
@JvmStatic
fun main(args: Array<String>) = runApplication<Application>(*args)
}
}
使用 Java 配置(无 Spring Boot)
Spring for Apache Kafka 旨在用于 Spring 应用程序上下文。例如,如果您在 Spring 上下文之外自己创建监听器容器,则除非满足容器实现的所有 ...Aware 接口,否则并非所有功能都能正常工作。 |
这是一个不使用 Spring Boot 的应用程序示例;它同时具有 Consumer
和 Producer
。
无 Spring Boot
-
Java
-
Kotlin
public class Sender {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}
private final KafkaTemplate<Integer, String> template;
public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}
public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}
}
public class Listener {
@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}
}
@Configuration
@EnableKafka
public class Config {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}
@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}
private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
class Sender(private val template: KafkaTemplate<Int, String>) {
fun send(toSend: String, key: Int) {
template.send("topic1", key, toSend)
}
}
class Listener {
@KafkaListener(id = "listen1", topics = ["topic1"])
fun listen1(`in`: String) {
println(`in`)
}
}
@Configuration
@EnableKafka
class Config {
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, String>) =
ConcurrentKafkaListenerContainerFactory<Int, String>().also { it.consumerFactory = consumerFactory }
@Bean
fun consumerFactory() = DefaultKafkaConsumerFactory<Int, String>(consumerProps)
val consumerProps = mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG to "group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
)
@Bean
fun sender(template: KafkaTemplate<Int, String>) = Sender(template)
@Bean
fun listener() = Listener()
@Bean
fun producerFactory() = DefaultKafkaProducerFactory<Int, String>(senderProps)
val senderProps = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ProducerConfig.LINGER_MS_CONFIG to 10,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to IntegerSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
)
@Bean
fun kafkaTemplate(producerFactory: ProducerFactory<Int, String>) = KafkaTemplate(producerFactory)
}
如您所见,在不使用 Spring Boot 的情况下,您必须定义几个基础设施 Bean。