可观察性

Spring 通过 Micrometer 提供对可观察性的支持,Micrometer 定义了 观察概念,该概念支持应用程序中的指标和跟踪

Spring Cloud Stream 通过提供多个抽象(包括 ObservationFunctionAroundWrapper)在 Spring Cloud Function 的级别上集成了这种支持,该抽象封装了函数以开箱即用地处理观察。

所需依赖项

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
	<groupId>io.projectreactor</groupId>
	<artifactId>reactor-core-micrometer</artifactId>
</dependency>

以及可用的跟踪器桥接器之一。例如 Zipkin Brave

<dependency>
	<groupId>io.micrometer</groupId>
	<artifactId>micrometer-tracing-bridge-brave</artifactId>
</dependency>

命令式函数

命令式函数使用观察包装器 ObservationFunctionAroundWrapper 进行包装,该包装器提供必要的基础设施来处理与观察注册表的交互。此类交互在每次调用函数时都会发生,这意味着观察附加到函数的每次调用(即,每个消息一次观察)。换句话说,对于命令式函数,如果存在前面提到的所需依赖项,可观察性将正常工作。

响应式函数

响应式函数本质上不同于命令式函数,因此不会被ObservationFunctionAroundWrapper包装。

命令式函数是一个消息处理函数,由框架在每次有消息时调用,类似于典型的事件处理程序,对于 N 个消息,将调用该函数 N 次。这使我们能够包装此类函数,以使用附加功能(如错误处理重试以及当然还有可观察性)对其进行装饰。

响应式函数是初始化函数。它的作用是将用户提供的流处理代码(Flux)连接到绑定程序提供的源流和目标流。它仅在应用程序启动期间调用一次。一旦流代码与源/目标流连接,我们就无法看到或控制实际的流处理。它掌握在响应式 API 的手中。响应式函数还带来了一个额外的变量。鉴于该函数使您能够看到整个流链(而不仅仅是单个事件),那么观察的默认单位应该是什么?流链中的单个项目?一系列项目?如果一段时间后没有消息,该怎么办?等等。我们想要强调的是,对于响应式函数,我们不能假设任何事情。(有关响应式函数和命令式函数之间差异的更多信息,请参阅 响应式函数)。

因此,就像重试错误处理一样,您需要手动处理观察。

幸运的是,您可以通过使用响应式 API 的tap操作来访问流的某个部分,同时提供ObservationRegistry的实例来轻松实现。此段定义了观察的单位,可以是 Flux 中的单个项目,也可以是一系列项目,或者您可能想要在流中观察的任何其他内容。

@SpringBootApplication
public class DemoStreamApplication {

	Logger logger = LoggerFactory.getLogger(DemoStreamApplication.class);

	public static void main(String[] args) {
		Hooks.enableAutomaticContextPropagation();
		SpringApplication.run(DemoStreamApplication.class, args);
	}

	@Bean
	public Function<Flux<String>, Flux<String>> uppercase(ObservationRegistry registry) {
		return flux -> flux.flatMap(item -> {
			return Mono.just(item)
                             .map(value -> value.toUpperCase())
                             .doOnNext(v -> logger.info(v))
                             .tap(Micrometer.observation(registry));
		});
	}
}

上面的示例模拟将 Observation 附加到单个消息处理(即命令式函数),因为在这种情况下,观察的单位从 Mono.just(..) 开始,最后一个操作将 ObservationRegistry 附加到订阅者。

如果订阅者已经附加了观察,它将用于为 tap 上游的链/段创建子观察,但是正如我们已经说过的那样,默认情况下,框架不会将任何观察附加到您返回的流链。