Apache Pulsar 支持

Apache Pulsar 通过提供对 Spring for Apache Pulsar 项目的自动配置来支持。

org.springframework.pulsar:spring-pulsar 位于类路径上时,Spring Boot 将自动配置并注册经典(命令式)Spring for Apache Pulsar 组件。当 org.springframework.pulsar:spring-pulsar-reactive 位于类路径上时,它将对响应式组件执行相同的操作。

分别有 spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive “启动器” 方便地收集命令式和响应式使用的依赖项。

连接到 Pulsar

当您使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个 PulsarClient bean。

默认情况下,应用程序尝试连接到 pulsar://127.0.0.1:6650 上的本地 Pulsar 实例。这可以通过将 spring.pulsar.client.service-url 属性设置为不同的值来调整。

该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何以 spring.pulsar.client.* 为前缀的应用程序属性来配置客户端。

如果您需要对配置进行更多控制,请考虑注册一个或多个 PulsarClientBuilderCustomizer bean。

身份验证

要连接到需要身份验证的 Pulsar 集群,您需要通过设置 `pluginClassName` 和插件所需的任何参数来指定要使用的身份验证插件。您可以将参数设置为参数名称到参数值的映射。以下示例展示了如何配置 `AuthenticationOAuth2` 插件。

  • 属性

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要确保 `spring.pulsar.client.authentication.param.*` 下定义的名称与您的身份验证插件(通常是驼峰式命名)所期望的名称完全匹配。Spring Boot 不会尝试对这些条目进行任何类型的松散绑定。

例如,如果您想为 `AuthenticationOAuth2` 身份验证插件配置颁发者 URL,您必须使用 `spring.pulsar.client.authentication.param.issuerUrl`。如果您使用其他形式,例如 `issuerurl` 或 `issuer-url`,则设置将不会应用于插件。

这种缺乏松散绑定也使得使用环境变量进行身份验证参数变得很麻烦,因为在转换过程中会丢失大小写敏感性。如果您使用环境变量作为参数,则需要按照 这些步骤(在 Spring for Apache Pulsar 参考文档中)进行操作才能正常工作。

SSL

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。您可以按照 这些步骤(在 Spring for Apache Pulsar 参考文档中)进行操作以启用 TLS 加密。

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档

以响应式方式连接到 Pulsar

当响应式自动配置被激活时,Spring Boot 将自动配置并注册一个 `ReactivePulsarClient` bean。

`ReactivePulsarClient` 适应了之前描述的 `PulsarClient` 的实例。因此,请按照上一节中的步骤配置 `ReactivePulsarClient` 使用的 `PulsarClient`。

连接到 Pulsar 管理

Spring for Apache Pulsar 的 `PulsarAdministration` 客户端也被自动配置。

默认情况下,应用程序尝试连接到 `https://127.0.0.1:8080` 上的本地 Pulsar 实例。这可以通过将 `spring.pulsar.admin.service-url` 属性设置为 `(http|https)://<host>:<port>` 形式的另一个值来调整。

如果您需要更多控制配置,请考虑注册一个或多个 `PulsarAdminBuilderCustomizer` bean。

身份验证

当访问需要身份验证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。您可以使用前面提到的 身份验证配置,将 spring.pulsar.client.authentication 替换为 spring.pulsar.admin.authentication

要在启动时创建主题,请添加一个类型为 PulsarTopic 的 bean。如果主题已存在,则忽略该 bean。

发送消息

Spring 的 PulsarTemplate 是自动配置的,您可以使用它来发送消息,如下面的示例所示

  • Java

  • Kotlin

import org.apache.pulsar.client.api.PulsarClientException;

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() throws PulsarClientException {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate 依赖于 PulsarProducerFactory 来创建底层的 Pulsar 生产者。Spring Boot 自动配置也提供此生产者工厂,默认情况下,它会缓存它创建的生产者。您可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置生产者工厂和缓存设置。

如果您需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 ProducerBuilderCustomizer bean。这些自定义程序将应用于所有创建的生产者。您也可以在发送消息时传入一个 ProducerBuilderCustomizer,以仅影响当前生产者。

如果您需要对要发送的消息进行更多控制,您可以在发送消息时传入一个 TypedMessageBuilderCustomizer

以响应式方式发送消息

当 Reactive 自动配置被激活时,Spring 的 ReactivePulsarTemplate 是自动配置的,您可以使用它来发送消息,如下面的示例所示

  • Java

  • Kotlin

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate 依赖于 ReactivePulsarSenderFactory 来实际创建底层的发送器。Spring Boot 自动配置也提供此发送器工厂,默认情况下,它会缓存它创建的生产者。您可以通过指定任何以 spring.pulsar.producer.*spring.pulsar.producer.cache.* 为前缀的应用程序属性来配置发送器工厂和缓存设置。

如果您需要对发送器工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageSenderBuilderCustomizer bean。这些自定义程序将应用于所有创建的发送器。您也可以在发送消息时传入一个 ReactiveMessageSenderBuilderCustomizer,以仅影响当前发送器。

如果您需要对发送的消息进行更多控制,您可以在发送消息时传入一个 MessageSpecBuilderCustomizer

接收消息

当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用 @PulsarListener 进行注解以创建一个监听器端点。以下组件在 someTopic 主题上创建一个监听器端点

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置提供了 PulsarListener 所需的所有组件,例如 PulsarListenerContainerFactory 和它用来构建底层 Pulsar 消费者的消费者工厂。您可以通过指定任何以 spring.pulsar.listener.*spring.pulsar.consumer.* 为前缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂配置进行更多控制,请考虑注册一个或多个 ConsumerBuilderCustomizer Bean。这些自定义器将应用于工厂创建的所有消费者,因此也应用于所有 @PulsarListener 实例。您还可以通过设置 @PulsarListener 注解的 consumerCustomizer 属性来自定义单个监听器。

接收响应式消息

当 Apache Pulsar 基础设施存在且响应式自动配置被激活时,任何 Bean 都可以使用 @ReactivePulsarListener 进行注解以创建一个响应式监听器端点。以下组件在 someTopic 主题上创建一个响应式监听器端点

  • Java

  • Kotlin

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自动配置提供了 ReactivePulsarListener 所需的所有组件,例如 ReactivePulsarListenerContainerFactory 和它用来构建底层响应式 Pulsar 消费者的消费者工厂。您可以通过指定任何以 spring.pulsar.listener.spring.pulsar.consumer. 为前缀的应用程序属性来配置这些组件。

如果您需要对消费者工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageConsumerBuilderCustomizer Bean。这些自定义器将应用于工厂创建的所有消费者,因此也应用于所有 @ReactivePulsarListener 实例。您还可以通过设置 @ReactivePulsarListener 注解的 consumerCustomizer 属性来自定义单个监听器。

读取消息

Pulsar 读取器接口使应用程序能够手动管理游标。当您使用读取器连接到主题时,您需要指定读取器连接到主题时从哪个消息开始读取。

当 Apache Pulsar 基础设施存在时,任何 Bean 都可以通过 @PulsarReader 注解来使用 reader 消费消息。以下组件创建了一个 reader 端点,从 someTopic 主题的开头开始读取消息。

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader 依赖于 PulsarReaderFactory 来创建底层的 Pulsar reader。Spring Boot 自动配置提供了这个 reader 工厂,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用程序属性来定制它。

如果您需要对 reader 工厂配置进行更多控制,请考虑注册一个或多个 ReaderBuilderCustomizer Bean。这些定制器将应用于工厂创建的所有 reader,因此也应用于所有 @PulsarReader 实例。您还可以通过设置 @PulsarReader 注解的 readerCustomizer 属性来定制单个监听器。

以响应式方式读取消息

当 Apache Pulsar 基础设施存在且 Reactive 自动配置被激活时,Spring 的 ReactivePulsarReaderFactory 会被提供,您可以使用它来创建 reader 以响应式方式读取消息。以下组件使用提供的工厂创建了一个 reader,并从 someTopic 主题中读取 5 分钟前的单条消息。

  • Java

  • Kotlin

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自动配置提供了这个 reader 工厂,可以通过设置任何以 spring.pulsar.reader.* 为前缀的应用程序属性来定制它。

如果您需要对 reader 工厂配置进行更多控制,请考虑在使用工厂创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer 实例。

如果您需要对 reader 工厂配置进行更多控制,请考虑注册一个或多个 ReactiveMessageReaderBuilderCustomizer Bean。这些定制器将应用于所有创建的 reader。您也可以在创建 reader 时传入一个或多个 ReactiveMessageReaderBuilderCustomizer,以仅将定制应用于创建的 reader。

有关上述任何组件的更多详细信息,以及发现其他可用功能,请参阅 Spring for Apache Pulsar 的 参考文档

事务支持

Spring for Apache Pulsar 在使用 PulsarTemplate@PulsarListener 时支持事务。

目前不支持使用响应式变体进行事务处理。

spring.pulsar.transaction.enabled属性设置为true

  • 配置一个PulsarTransactionManager bean

  • PulsarTemplate启用事务支持

  • @PulsarListener方法启用事务支持

@PulsarListenertransactional属性可用于微调何时应将事务与侦听器一起使用。

为了更好地控制 Apache Pulsar 事务的 Spring 功能,您应该定义自己的PulsarTemplate和/或ConcurrentPulsarListenerContainerFactory bean。如果默认自动配置的PulsarTransactionManager不合适,您还可以定义一个PulsarAwareTransactionManager bean。

其他 Pulsar 属性

自动配置支持的属性在附录的“集成属性”部分中显示。请注意,在大多数情况下,这些属性(带连字符或驼峰式大小写)直接映射到 Apache Pulsar 配置属性。有关详细信息,请参阅 Apache Pulsar 文档。

Pulsar 支持的属性子集可通过PulsarProperties类直接使用。如果您希望使用未直接支持的其他属性来调整自动配置的组件,可以使用每个上述组件支持的自定义程序。