Apache Pulsar 的 Spring Cloud Stream 绑定器

Spring for Apache Pulsar 为 Spring Cloud Stream 提供了一个绑定器,我们可以使用它来构建使用发布-订阅范式的事件驱动微服务。在本节中,我们将介绍此绑定器的基本细节。

用法

要使用 Apache Pulsar 绑定器进行 Spring Cloud Stream,需要在应用程序中包含以下依赖项。

  • Maven

  • Gradle

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

概述

Apache Pulsar 的 Spring Cloud Stream 绑定器允许应用程序专注于业务逻辑,而不是处理管理和维护 Pulsar 的底层细节。绑定器为应用程序开发人员处理所有这些细节。Spring Cloud Stream 基于 Spring Cloud Function 提供了一个强大的编程模型,允许应用程序开发人员使用函数式风格编写复杂的事件驱动应用程序。应用程序可以从中间件中立的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目标。Spring Cloud Stream 构建在 Spring Boot 之上,当使用 Spring Cloud Stream 编写事件驱动微服务时,实际上是在编写一个 Boot 应用程序。这是一个简单的 Spring Cloud Stream 应用程序。

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上述示例应用程序是一个完整的 Spring Boot 应用程序,值得解释一下。但是,在第一次浏览时,您可以看到这只是纯 Java 代码和一些 Spring 和 Spring Boot 注解。这里有三个 Bean 方法 - 一个 java.util.function.Supplier、一个 java.util.function.Function,最后是一个 java.util.function.Consumer。供应商以毫秒为单位生成当前时间,函数获取此时间,然后通过添加一些随机数据对其进行增强,然后消费者记录增强后的时间。

为了简洁起见,我们省略了所有导入,但在整个应用程序中没有任何 Spring Cloud Stream 特定的内容。它如何成为与 Apache Pulsar 交互的 Spring Cloud Stream 应用程序?您必须在应用程序中包含上述绑定器的依赖项。添加此依赖项后,必须提供以下配置属性。

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

通过以上操作,我们构建了一个基于 Spring Cloud Stream 的端到端事件驱动应用程序。由于类路径中包含 Pulsar 绑定器,应用程序会与 Apache Pulsar 进行交互。如果应用程序中只有一个函数,则无需显式告诉 Spring Cloud Stream 激活该函数以执行,因为它会默认执行。如果应用程序中有多个这样的函数,就像我们的示例一样,我们需要指示 Spring Cloud Stream 我们希望激活哪些函数。在本例中,我们需要激活所有函数,可以通过 spring.cloud.function.definition 属性来实现。默认情况下,Bean 名称会成为 Spring Cloud Stream 绑定名称的一部分。绑定是 Spring Cloud Stream 中一个基本抽象的概念,框架通过它与中间件目标进行通信。Spring Cloud Stream 执行的大多数操作都依赖于具体的绑定。Supplier 只有一个输出绑定;函数具有输入和输出绑定;Consumer 只有一个输入绑定。以我们的 Supplier Bean - timeSupplier 为例。此 Supplier 的默认绑定名称将为 timeSupplier-out-0。类似地,timeProcessor 函数的默认绑定名称在输入端为 timeProcessor-in-0,在输出端为 timeProcessor-out-0。有关如何更改默认绑定名称的详细信息,请参阅 Spring Cloud Stream 参考文档。在大多数情况下,使用默认绑定名称就足够了。如上所示,我们在绑定名称上设置了目标。如果未提供目标,则绑定名称将成为目标的值,例如 timeSupplier-out-0 的情况。

运行上述应用程序时,您应该会看到 Supplier 每秒执行一次,然后由函数消费并由 Logger Consumer 增强所消耗的时间。

基于绑定器的应用程序中的消息转换

在以上示例应用程序中,我们没有提供任何消息转换的模式信息。这是因为,默认情况下,Spring Cloud Stream 使用其消息转换机制,该机制通过 Spring Messaging 项目在 Spring 框架中建立的消息支持来实现。除非另行指定,否则 Spring Cloud Stream 使用 application/json 作为输入和输出绑定上消息转换的 content-type。在输出端,数据被序列化为 byte[],然后 Pulsar 绑定器使用 Schema.BYTES 将其通过网络发送到 Pulsar 主题。类似地,在输入端,数据从 Pulsar 主题中作为 byte[] 消费,然后使用正确的消息转换器转换为目标类型。

使用 Pulsar Schema 在 Pulsar 中进行本地转换

尽管默认情况下使用框架提供的消息转换,但 Spring Cloud Stream 允许每个绑定器确定消息的转换方式。假设应用程序选择走这条路线。在这种情况下,Spring Cloud Stream 会避免使用任何 Spring 提供的消息转换功能,并传递它接收或生成的数据。Spring Cloud Stream 中的此功能称为生产端上的本地编码和消费端上的本地解码。这意味着编码和解码会在目标中间件上本地发生,在本例中,是在 Apache Pulsar 上。对于上述应用程序,我们可以使用以下配置来绕过框架转换并使用本地编码和解码。

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生产端启用本地编码的属性是来自核心 Spring Cloud Stream 的绑定级别属性。您可以在生产者绑定上设置它 - spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding 并将其设置为 true。类似地,使用 - spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding 用于 Consumer 绑定并将其设置为 true。如果我们决定使用本地编码和解码,在 Pulsar 的情况下,我们需要设置相应的模式和底层消息类型信息。此信息作为扩展绑定属性提供。如您在上面的配置中所见,属性为 - spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type 用于模式信息,spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type 用于实际的目标类型。如果消息中同时存在键和值,则可以使用 message-key-typemessage-value-type 来指定其目标类型。

省略 schema-type 属性时,将参考任何已配置的自定义模式映射。

消息头转换

每条消息通常都包含头信息,这些信息需要在消息通过 Spring Cloud Stream 输入和输出绑定在 Pulsar 和 Spring Messaging 之间传递时携带。为了支持此传递,框架处理必要的消息头转换。

自定义头映射器

Pulsar 绑定器配置了默认的头映射器,可以通过提供您自己的 PulsarHeaderMapper Bean 来覆盖它。

在以下示例中,配置了一个 JSON 头映射器,它

  • 映射所有传入的头信息(除了键为“top”或“secret”的头信息)

  • 映射传出头信息(除了键为“id”、“timestamp”或“userId”的头信息)

  • 仅信任“com.acme”包中的对象进行传出反序列化

  • 使用简单的 toString() 编码对任何“com.acme.Money”头值进行序列化/反序列化

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

在绑定器中使用 Pulsar 属性

绑定器使用来自 Spring for Apache Pulsar 框架的基本组件来构建其生产者和消费者绑定。由于基于绑定器的应用程序是 Spring Boot 应用程序,因此绑定器默认情况下使用 Spring for Apache Pulsar 的 Spring Boot 自动配置。因此,核心框架级别可用的所有 Pulsar Spring Boot 属性也可通过绑定器使用。例如,您可以使用以 spring.pulsar.producer…​spring.pulsar.consumer…​ 等为前缀的属性。此外,您还可以在绑定器级别设置这些 Pulsar 属性。例如,这也将起作用 - spring.cloud.stream.pulsar.binder.producer…​spring.cloud.stream.pulsar.binder.consumer…​

以上两种方法都可以,但是当使用这些属性时,它们会应用于整个应用程序。如果应用程序中有多个函数,则它们都会获得相同的属性。您也可以在扩展绑定属性级别设置这些 Pulsar 属性来解决此问题。扩展绑定属性应用于绑定本身。例如,如果您有一个输入和输出绑定,并且两者都需要一组单独的 Pulsar 属性,则必须在扩展绑定上设置它们。生产者绑定的模式为 spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​。类似地,对于消费者绑定,模式为 spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​。这样,您就可以为同一应用程序中不同的绑定应用一组单独的 Pulsar 属性。

扩展绑定属性具有最高优先级。在绑定器中应用属性的优先级顺序为 扩展绑定属性 → 绑定器属性 → Spring Boot 属性。(从最高到最低)。

以下是一些资源,您可以依靠它们来查找有关通过 Pulsar 绑定器提供的属性的更多信息。

Pulsar 生产者绑定配置。这些属性需要 spring.cloud.stream.bindings.<binding-name>.producer 前缀。所有 Spring Boot 提供的 Pulsar 生产者属性 也可通过此配置类使用。

Pulsar 消费者绑定配置。这些属性需要 spring.cloud.stream.bindings.<binding-name>.consumer 前缀。所有 Spring Boot 提供的 Pulsar 消费者属性 也可通过此配置类使用。

有关常见的 Pulsar 绑定器特定配置属性,请参阅 此链接。这些属性需要 spring.cloud.stream.pulsar.binder 前缀。上述指定的生产者和消费者属性(包括 Spring Boot 属性)可以使用 spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer 前缀在绑定器中使用。

Pulsar 主题供应器

用于 Apache Pulsar 的 Spring Cloud Stream 绑定器附带了一个开箱即用的 Pulsar 主题供应器。运行应用程序时,如果缺少必要的主题,Pulsar 将为您创建主题。但是,这是一个基本的非分区主题,如果您希望使用高级功能(如创建分区主题),则可以依靠绑定器中的主题供应器。Pulsar 主题供应器使用框架中的 PulsarAdministration,该框架使用 PulsarAdminBuilder。因此,您需要设置 spring.pulsar.administration.service-url 属性,除非您在默认服务器和端口上运行 Pulsar。

创建主题时指定分区数

创建主题时,您可以通过两种方式设置分区数。首先,您可以使用属性 spring.cloud.stream.pulsar.binder.partition-count 在绑定器级别设置它。如上所述,这样做将使应用程序创建的所有主题继承此属性。假设您希望在绑定级别对设置分区进行细粒度控制。在这种情况下,您可以使用格式 spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 按绑定设置 partition-count 属性。这样,同一应用程序中不同函数创建的各种主题将根据应用程序需求具有不同的分区。