Apache Pulsar 支持

通过提供对Spring for Apache Pulsar项目的自动配置来支持Apache Pulsar

当类路径中存在org.springframework.pulsar:spring-pulsar时,Spring Boot 将自动配置并注册经典(命令式)Spring for Apache Pulsar 组件。当类路径中存在org.springframework.pulsar:spring-pulsar-reactive时,它将对响应式组件执行相同的操作。

分别有spring-boot-starter-pulsarspring-boot-starter-pulsar-reactive启动器方便地收集命令式和响应式使用的依赖项。

连接到 Pulsar

当您使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个PulsarClient bean。

默认情况下,应用程序尝试连接到pulsar://127.0.0.1:6650上的本地 Pulsar 实例。可以通过将spring.pulsar.client.service-url属性设置为不同的值来调整此设置。

该值必须是一个有效的Pulsar 协议URL。

您可以通过指定任何以spring.pulsar.client.*为前缀的应用程序属性来配置客户端。

如果您需要更多地控制配置,请考虑注册一个或多个PulsarClientBuilderCustomizer bean。

身份验证

要连接到需要身份验证的 Pulsar 集群,您需要通过设置pluginClassName以及插件所需的任何参数来指定要使用的身份验证插件。您可以将参数设置为参数名到参数值的映射。以下示例显示了如何配置AuthenticationOAuth2插件。

  • 属性

  • YAML

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要确保spring.pulsar.client.authentication.param.*下定义的名称与您的身份验证插件(通常是小驼峰命名法)所期望的名称完全匹配。Spring Boot 不会尝试对这些条目进行任何类型的宽松绑定。

例如,如果您想为AuthenticationOAuth2身份验证插件配置颁发者 URL,则必须使用spring.pulsar.client.authentication.param.issuerUrl。如果您使用其他形式,例如issuerurlissuer-url,则该设置将不会应用于插件。

这种缺乏宽松绑定也使得使用环境变量进行身份验证参数配置变得很棘手,因为在转换过程中大小写敏感性会丢失。如果您使用环境变量作为参数,则需要按照Spring for Apache Pulsar 参考文档中的这些步骤操作才能正常工作。

SSL

默认情况下,Pulsar 客户端以纯文本方式与 Pulsar 服务通信。您可以按照Spring for Apache Pulsar 参考文档中的这些步骤启用 TLS 加密。

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档

响应式连接到 Pulsar

当激活响应式自动配置时,Spring Boot 将自动配置并注册一个ReactivePulsarClient bean。

ReactivePulsarClient 适配前面描述的PulsarClient的实例。因此,请按照上一节中的说明配置ReactivePulsarClient使用的PulsarClient

连接到 Pulsar 管理

Spring for Apache Pulsar 的PulsarAdministration客户端也已自动配置。

默认情况下,应用程序尝试连接到https://127.0.0.1:8080处的本地 Pulsar 实例。这可以通过将spring.pulsar.admin.service-url属性设置为(http|https)://<host>:<port>形式的不同值来调整。

如果您需要更多控制权来进行配置,请考虑注册一个或多个PulsarAdminBuilderCustomizer bean。

身份验证

当访问需要身份验证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。您可以使用前面提到的身份验证配置,将spring.pulsar.client.authentication替换为spring.pulsar.admin.authentication

要在启动时创建主题,请添加类型为PulsarTopic的 bean。如果主题已存在,则忽略该 bean。

发送消息

Spring 的PulsarTemplate已自动配置,您可以使用它来发送消息,如下例所示

  • Java

  • Kotlin

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

PulsarTemplate依赖于PulsarProducerFactory来创建底层的 Pulsar 生产者。Spring Boot 自动配置也提供此生产者工厂,默认情况下,它会缓存它创建的生产者。您可以通过指定任何以spring.pulsar.producer.*spring.pulsar.producer.cache.*为前缀的应用程序属性来配置生产者工厂和缓存设置。

如果您需要更多控制生产者工厂配置,请考虑注册一个或多个ProducerBuilderCustomizer bean。这些自定义程序将应用于所有创建的生产者。您也可以在发送消息时传入ProducerBuilderCustomizer,以便仅影响当前生产者。

如果您需要更多控制要发送的消息,则可以在发送消息时传入TypedMessageBuilderCustomizer

响应式发送消息

当激活响应式自动配置时,Spring 的ReactivePulsarTemplate已自动配置,您可以使用它来发送消息,如下例所示

  • Java

  • Kotlin

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

ReactivePulsarTemplate依赖于ReactivePulsarSenderFactory来实际创建底层发送者。Spring Boot 自动配置也提供此发送者工厂,默认情况下,它会缓存它创建的生产者。您可以通过指定任何以spring.pulsar.producer.*spring.pulsar.producer.cache.*为前缀的应用程序属性来配置发送者工厂和缓存设置。

如果您需要更多控制发送者工厂配置,请考虑注册一个或多个ReactiveMessageSenderBuilderCustomizer bean。这些自定义程序将应用于所有创建的发送者。您也可以在发送消息时传入ReactiveMessageSenderBuilderCustomizer,以便仅影响当前发送者。

如果您需要更多控制要发送的消息,则可以在发送消息时传入MessageSpecBuilderCustomizer

接收消息

当 Apache Pulsar 基础设施存在时,任何 bean 都可以使用@PulsarListener进行注释以创建侦听器端点。以下组件在someTopic主题上创建一个侦听器端点

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot 自动配置提供了PulsarListener所需的所有组件,例如PulsarListenerContainerFactory及其用于构造底层 Pulsar 使用者的使用者工厂。您可以通过指定任何以spring.pulsar.listener.*spring.pulsar.consumer.*为前缀的应用程序属性来配置这些组件。

如果您需要更多控制使用者工厂配置,请考虑注册一个或多个ConsumerBuilderCustomizer bean。这些自定义程序将应用于工厂创建的所有使用者,因此也应用于所有@PulsarListener实例。您还可以通过设置@PulsarListener注释的consumerCustomizer属性来自定义单个侦听器。

响应式接收消息

当 Apache Pulsar 基础设施存在且激活响应式自动配置时,任何 bean 都可以使用@ReactivePulsarListener进行注释以创建响应式侦听器端点。以下组件在someTopic主题上创建一个响应式侦听器端点

  • Java

  • Kotlin

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot 自动配置提供了ReactivePulsarListener所需的所有组件,例如ReactivePulsarListenerContainerFactory及其用于构造底层响应式 Pulsar 使用者的使用者工厂。您可以通过指定任何以spring.pulsar.listener.*spring.pulsar.consumer.*为前缀的应用程序属性来配置这些组件。

如果您需要更多控制使用者工厂配置,请考虑注册一个或多个ReactiveMessageConsumerBuilderCustomizer bean。这些自定义程序将应用于工厂创建的所有使用者,因此也应用于所有@ReactivePulsarListener实例。您还可以通过设置@ReactivePulsarListener注释的consumerCustomizer属性来自定义单个侦听器。

读取消息

Pulsar 读取器接口使应用程序能够手动管理游标。当您使用读取器连接到主题时,您需要指定读取器连接到主题时开始读取的消息。

当 Apache Pulsar 基础设施存在时,任何 bean 都可以使用@PulsarReader进行注释以使用读取器使用消息。以下组件创建一个读取器端点,该端点从someTopic主题的开头开始读取消息

  • Java

  • Kotlin

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

@PulsarReader依赖于PulsarReaderFactory来创建底层的 Pulsar 读取器。Spring Boot 自动配置提供此读取器工厂,可以通过设置任何以spring.pulsar.reader.*为前缀的应用程序属性来自定义它。

如果您需要更多控制读取器工厂配置,请考虑注册一个或多个ReaderBuilderCustomizer bean。这些自定义程序将应用于工厂创建的所有读取器,因此也应用于所有@PulsarReader实例。您还可以通过设置@PulsarReader注释的readerCustomizer属性来自定义单个侦听器。

响应式读取消息

当 Apache Pulsar 基础设施存在且激活响应式自动配置时,将提供 Spring 的ReactivePulsarReaderFactory,您可以使用它来创建读取器以便以响应式方式读取消息。以下组件使用提供的工厂创建读取器,并从someTopic主题读取 5 分钟前的一条消息

  • Java

  • Kotlin

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot 自动配置提供此读取器工厂,可以通过设置任何以spring.pulsar.reader.*为前缀的应用程序属性来自定义它。

如果您需要更多控制读取器工厂配置,请考虑在使用工厂创建读取器时传入一个或多个ReactiveMessageReaderBuilderCustomizer实例。

如果您需要更多控制读取器工厂配置,请考虑注册一个或多个ReactiveMessageReaderBuilderCustomizer bean。这些自定义程序将应用于所有创建的读取器。您也可以在创建读取器时传入一个或多个ReactiveMessageReaderBuilderCustomizer,以便仅将自定义应用于创建的读取器。

有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档

事务支持

Spring for Apache Pulsar 在使用PulsarTemplate@PulsarListener时支持事务。

目前,使用响应式变体时不支持事务。

spring.pulsar.transaction.enabled属性设置为true

  • 配置一个PulsarTransactionManager bean

  • 启用PulsarTemplate的事务支持

  • 启用@PulsarListener方法的事务支持

@PulsarListenertransactional属性可用于微调何时应将事务与侦听器一起使用。

为了更好地控制 Spring for Apache Pulsar 事务功能,您应该定义您自己的PulsarTemplate和/或ConcurrentPulsarListenerContainerFactory bean。如果默认自动配置的PulsarTransactionManager不适用,您还可以定义一个PulsarAwareTransactionManager bean。

其他 Pulsar 属性

自动配置支持的属性在附录的集成属性部分中显示。请注意,在大多数情况下,这些属性(带连字符或驼峰式命名法)直接映射到 Apache Pulsar 配置属性。有关详细信息,请参阅 Apache Pulsar 文档。

只有 Pulsar 支持的属性子集可直接通过PulsarProperties类使用。如果您希望使用未直接支持的其他属性来调整自动配置的组件,则可以使用每个上述组件支持的自定义程序。