Kafka Streams 绑定器中的绑定可视化和控制

从版本 3.1.2 开始,Kafka Streams 绑定器支持绑定可视化和控制。仅支持两个生命周期阶段:STOPPEDSTARTEDPAUSEDRESUMED 生命周期阶段在 Kafka Streams 绑定器中不可用。

为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

如果你喜欢使用 webflux,则可以包含 spring-boot-starter-webflux 而不是标准 web 依赖项。

此外,你还需要设置以下属性:

management.endpoints.web.exposure.include=bindings

为了进一步说明此功能,让我们使用以下应用程序作为指导:

@SpringBootApplication
public class KafkaStreamsApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStreamsApplication.class, args);
	}

	@Bean
	public Consumer<KStream<String, String>> consumer() {
		return s -> s.foreach((key, value) -> System.out.println(value));
	}

	@Bean
	public Function<KStream<String, String>, KStream<String, String>> function() {
		return ks -> ks;
	}

}

正如我们所看到的,该应用程序有两个 Kafka Streams 函数——一个消费者和一个函数。消费者绑定默认命名为 consumer-in-0。类似地,对于该函数,输入绑定是 function-in-0,输出绑定是 function-out-0

应用程序启动后,我们可以使用以下绑定端点查找有关绑定的详细信息。

 curl https://:8080/actuator/bindings | jq .
[
  {
    "bindingName": "consumer-in-0",
    "name": "consumer-in-0",
    "group": "consumer-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-in-0",
    "name": "function-in-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-out-0",
    "name": "function-out-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": false,
    "extendedInfo": {}
  }
]

以上可以找到所有三个绑定的详细信息。

现在让我们停止 consumer-in-0 绑定。

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/consumer-in-0

此时,此绑定将不再接收任何记录。

再次启动绑定。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/consumer-in-0

当单个函数上存在多个绑定时,在其中任何一个绑定上调用这些操作都将起作用。这是因为单个函数上的所有绑定都由同一个 StreamsBuilderFactoryBean 支持。因此,对于上面的函数,function-in-0function-out-0 都可以工作。

© . This site is unofficial and not affiliated with VMware.