4.0.5

前言

Spring 数据集成之旅简史

Spring 的数据集成之旅始于 Spring Integration。凭借其编程模型,它为构建应用程序提供了一致的开发者体验,这些应用程序可以采用 企业集成模式 来连接外部系统,例如数据库、消息代理等。

快进到云时代,微服务在企业环境中变得越来越突出。 Spring Boot 改变了开发者构建应用程序的方式。凭借 Spring 的编程模型和 Spring Boot 处理的运行时职责,开发独立的、生产级的基于 Spring 的微服务变得无缝。

为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被整合到一个新项目中。Spring Cloud Stream 诞生了。

使用 Spring Cloud Stream,开发者可以

  • 独立构建、测试和部署以数据为中心的应用程序。

  • 应用现代微服务架构模式,包括通过消息传递进行组合。

  • 使用以事件为中心的思维方式解耦应用程序职责。事件可以表示一段时间内发生的事情,下游消费者应用程序可以对其做出反应,而无需知道其来源或生产者的身份。

  • 将业务逻辑移植到消息代理(例如 RabbitMQ、Apache Kafka、Amazon Kinesis)。

  • 依赖框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。

  • 等等。

快速入门

您可以在深入了解任何细节之前,只需不到 5 分钟即可尝试使用 Spring Cloud Stream,只需按照以下三步指南操作即可。

我们将向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的消息传递中间件的消息(稍后将详细介绍),并将接收到的消息记录到控制台。我们将其称为 LoggingConsumer。虽然它并不实用,但它为一些主要的概念和抽象提供了良好的介绍,使您更容易理解本用户指南的其余部分。

这三个步骤如下

使用 Spring Initializr 创建示例应用程序

要开始,请访问 Spring Initializr。在那里,您可以生成我们的 LoggingConsumer 应用程序。为此

  1. 在**依赖项**部分,开始输入 stream。当“Cloud Stream”选项出现时,选择它。

  2. 开始输入“kafka”或“rabbit”。

  3. 选择“Kafka”或“RabbitMQ”。

    基本上,您选择要将应用程序绑定到的消息传递中间件。我们建议使用您已经安装的或觉得更舒适安装和运行的中间件。此外,正如您从 Initilaizer 屏幕中看到的,您可以选择一些其他选项。例如,您可以选择 Gradle 作为您的构建工具而不是 Maven(默认值)。

  4. 在**工件**字段中,键入“logging-consumer”。

    **工件**字段的值将成为应用程序名称。如果您为中间件选择了 RabbitMQ,则您的 Spring Initializr 现在应如下所示

spring initializr
  1. 单击**生成项目**按钮。

    这样做会将生成的项目的压缩版本下载到您的硬盘驱动器。

  2. 将文件解压缩到您要用作项目目录的文件夹中。

我们鼓励您探索 Spring Initializr 中提供的众多可能性。它允许您创建许多不同类型的 Spring 应用程序。

将项目导入您的 IDE

现在您可以将项目导入您的 IDE。请记住,根据 IDE 的不同,您可能需要遵循特定的导入过程。例如,根据项目的生成方式(Maven 或 Gradle),您可能需要遵循特定的导入过程(例如,在 Eclipse 或 STS 中,您需要使用文件→导入→Maven→现有 Maven 项目)。

导入后,项目必须没有任何错误。此外,src/main/java 应包含 com.example.loggingconsumer.LoggingConsumerApplication

从技术上讲,此时,您可以运行应用程序的主类。它已经是一个有效的 Spring Boot 应用程序。但是,它什么也没做,所以我们要添加一些代码。

添加消息处理器、构建和运行

修改 com.example.loggingconsumer.LoggingConsumerApplication 类使其如下所示

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

正如您从前面的列表中看到的

  • 我们使用函数式编程模型(请参阅 Spring Cloud Function 支持)将单个消息处理器定义为 Consumer

  • 我们依赖框架约定将此类处理器绑定到绑定器公开的输入目标绑定。

这样做还可以让您了解框架的核心功能之一:它尝试自动将传入的消息有效负载转换为 Person 类型。

您现在拥有了一个功能齐全的 Spring Cloud Stream 应用程序,它可以侦听消息。从这里开始,为简单起见,我们假设您在 第一步 中选择了 RabbitMQ。假设您已安装并运行了 RabbitMQ,您可以通过在 IDE 中运行其 main 方法来启动应用程序。

您应该看到以下输出

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,并将消息发送到 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCganonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分表示组名称并已生成,因此它在您的环境中必然不同。为了获得更可预测的结果,您可以通过设置 spring.cloud.stream.bindings.input.group=hello(或任何您喜欢的名称)来使用显式组名称。

消息的内容应为 Person 类的 JSON 表示形式,如下所示

{"name":"Sam Spade"}

然后,在您的控制台中,您应该看到

Received: Sam Spade

您还可以将应用程序构建并打包到启动 jar 中(使用 ./mvnw clean install)并使用 java -jar 命令运行构建的 JAR。

现在您拥有了一个可工作的(尽管非常基本)Spring Cloud Stream 应用程序。

流数据环境中的 Spring 表达式语言 (SpEL)

在本参考手册中,您将遇到许多可以使用 Spring 表达式语言 (SpEL) 的功能和示例。了解使用它时的一些限制非常重要。

SpEL允许您访问当前的消息以及您正在运行的应用程序上下文。但是,了解SpEL能够看到哪种类型的数据,尤其是在传入消息的上下文中,这一点非常重要。从代理端,消息以byte[]的形式到达。然后,它由绑定器转换为Message<byte[]>,正如您所看到的,消息的有效负载保持其原始形式。消息的头部是<String, Object>,其中值通常是另一个基本类型或基本类型的集合/数组,因此是Object。这是因为绑定器不知道所需的输入类型,因为它无法访问用户代码(函数)。因此,绑定器有效地传递了一个包含有效负载和一些可读元数据的信封,以消息头的形式出现,就像邮件传递的信件一样。这意味着,虽然可以访问消息的有效负载,但您只能将其作为原始数据(即byte[])访问。虽然开发人员经常会要求SpEL能够访问有效负载对象作为具体类型(例如,Foo、Bar等)的字段,但您可以看到实现这一点是多么困难甚至不可能。以下是一个演示问题的示例:假设您有一个路由表达式,根据有效负载类型将消息路由到不同的函数。此要求将意味着将有效负载从byte[]转换为特定类型,然后应用SpEL。但是,为了执行这种转换,我们需要知道要传递给转换器的实际类型,而该类型来自函数的签名,我们不知道哪个函数。解决此要求的更好方法是将类型信息作为消息头传递(例如,application/json;type=foo.bar.Baz)。您将获得一个清晰易读的字符串值,可以在一年内访问和评估,并且易于阅读SpEL表达式。

此外,将有效负载用于路由决策被认为是非常不好的做法,因为有效负载被认为是特权数据——仅供其最终接收者读取的数据。同样,使用邮件传递的类比,您不希望邮递员打开您的信封并阅读信件内容以做出一些投递决策。同样的概念也适用于此处,尤其是在生成消息时包含此类信息相对容易的情况下。它强制执行一定程度的纪律,涉及到要通过网络传输的数据的设计,以及哪些数据可以被视为公共数据,哪些数据是特权数据。

介绍 Spring Cloud Stream

Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。Spring Cloud Stream 基于 Spring Boot 创建独立的、生产级别的 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。它提供了几家供应商的中间件的约定配置,引入了持久发布-订阅语义、消费者组和分区的概念。

通过将spring-cloud-stream依赖项添加到应用程序的类路径中,您可以立即连接到由提供的spring-cloud-stream绑定器公开的消息代理(稍后详细介绍),并且您可以实现您的功能需求,即(基于传入消息)由java.util.function.Function运行。

以下清单显示了一个快速示例

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

    @Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

以下清单显示了相应的测试

@SpringBootTest(classes =  SampleApplication.class)
@Import({TestChannelBinderConfiguration.class})
class BootTestStreamApplicationTests {

	@Autowired
	private InputDestination input;

	@Autowired
	private OutputDestination output;

	@Test
	void contextLoads() {
		input.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}
}

主要概念

Spring Cloud Stream 提供了许多抽象和原语,简化了消息驱动的微服务应用程序的编写。本节概述以下内容

应用程序模型

Spring Cloud Stream 应用程序包含一个与中间件无关的核心。应用程序通过在外部代理公开的目标与代码中的输入/输出参数之间建立绑定来与外部世界通信。建立绑定所需的特定于代理的详细信息由特定于中间件的绑定器实现处理。

SCSt with binder
图 1. Spring Cloud Stream 应用程序

Fat JAR

Spring Cloud Stream 应用程序可以在测试环境中从您的 IDE 以独立模式运行。要在生产环境中运行 Spring Cloud Stream 应用程序,您可以使用 Maven 或 Gradle 提供的标准 Spring Boot 工具创建可执行(或“fat”)JAR。有关更多详细信息,请参阅Spring Boot 参考指南

绑定器抽象

Spring Cloud Stream 为KafkaRabbit MQ提供了绑定器实现。该框架还包括一个测试绑定器,用于对您的应用程序进行集成测试,作为 spring-cloud-stream 应用程序。有关更多详细信息,请参阅测试部分。

绑定器抽象也是框架的扩展点之一,这意味着您可以在 Spring Cloud Stream 之上实现自己的绑定器。在如何从头开始创建 Spring Cloud Stream 绑定器文章中,社区成员详细记录了实现自定义绑定器所需的步骤,并提供了一个示例。这些步骤也在实现自定义绑定器部分中突出显示。

Spring Cloud Stream 使用 Spring Boot 进行配置,绑定器抽象使 Spring Cloud Stream 应用程序能够灵活地连接到中间件。例如,部署人员可以在运行时动态选择外部目标(例如 Kafka 主题或 RabbitMQ 交换机)与消息处理程序的输入和输出(例如函数的输入参数及其返回值)之间的映射。此类配置可以通过外部配置属性提供,并以 Spring Boot 支持的任何形式提供(包括应用程序参数、环境变量以及application.ymlapplication.properties文件)。在介绍 Spring Cloud Stream部分的接收器示例中,将spring.cloud.stream.bindings.input.destination应用程序属性设置为raw-sensor-data会导致它从raw-sensor-data Kafka 主题或与raw-sensor-data RabbitMQ 交换机绑定的队列中读取。

Spring Cloud Stream 会自动检测并使用类路径中找到的绑定器。您可以使用相同代码的不同类型的中间件。为此,请在构建时包含不同的绑定器。对于更复杂的用例,您还可以将多个绑定器与您的应用程序打包在一起,并使其在运行时选择绑定器(甚至是否对不同的绑定使用不同的绑定器)。

持久发布-订阅支持

应用程序之间的通信遵循发布-订阅模型,其中数据通过共享主题广播。这可以在下图中看到,该图显示了一组交互式 Spring Cloud Stream 应用程序的典型部署。

SCSt sensors
图 2. Spring Cloud Stream 发布-订阅

传感器报告给 HTTP 端点的数据被发送到名为raw-sensor-data的公共目标。从目标中,它被独立地由一个计算时间窗口平均值的微服务应用程序和另一个将原始数据摄取到 HDFS(Hadoop 分布式文件系统)的微服务应用程序处理。为了处理数据,这两个应用程序在运行时都将主题声明为其输入。

发布-订阅通信模型降低了生产者和消费者的复杂性,并允许将新的应用程序添加到拓扑中,而不会中断现有流程。例如,在平均计算应用程序的下游,您可以添加一个应用程序,用于计算最高温度值以进行显示和监控。然后,您可以添加另一个应用程序,用于解释相同的平均值流以进行故障检测。通过共享主题而不是点对点队列进行所有通信,可以减少微服务之间的耦合。

虽然发布-订阅消息的概念并不新鲜,但 Spring Cloud Stream 额外地将其作为其应用程序模型的约定选择。通过使用本机中间件支持,Spring Cloud Stream 还简化了跨不同平台使用发布-订阅模型。

消费者组

虽然发布-订阅模型使通过共享主题连接应用程序变得容易,但通过创建给定应用程序的多个实例来扩展应用程序的能力也同样重要。这样做时,应用程序的不同实例将处于竞争性消费者关系中,其中仅预期一个实例处理给定的消息。

Spring Cloud Stream 通过消费者组的概念对这种行为进行建模。(Spring Cloud Stream 消费者组类似于并受 Kafka 消费者组启发。)每个消费者绑定可以使用spring.cloud.stream.bindings.<bindingName>.group属性指定组名。对于下图中显示的消费者,此属性将设置为spring.cloud.stream.bindings.<bindingName>.group=hdfsWritespring.cloud.stream.bindings.<bindingName>.group=average

SCSt groups
图 3. Spring Cloud Stream 消费者组

订阅给定目标的所有组都会收到已发布数据的副本,但每个组中只有一个成员会从该目标收到给定的消息。默认情况下,当未指定组时,Spring Cloud Stream 会将应用程序分配给匿名且独立的单成员消费者组,该组与所有其他消费者组处于发布-订阅关系中。

消费者类型

支持两种类型的消费者

  • 消息驱动型(有时称为异步)

  • 轮询(有时也称为同步)

在 2.0 版之前,仅支持异步消费者。消息一旦可用且有线程可处理,就会立即传递。

如果希望控制消息处理的速率,则可能需要使用同步消费者。

持久性

与 Spring Cloud Stream 的意见一致的应用程序模型一致,消费者组订阅是持久的。也就是说,绑定程序实现确保组订阅是持久的,并且一旦为某个组创建了至少一个订阅,该组就会接收消息,即使在组中的所有应用程序都停止时发送的消息也是如此。

匿名订阅本质上是非持久的。对于某些绑定程序实现(例如 RabbitMQ),可以使用非持久性组订阅。

通常,最好在将应用程序绑定到给定目标时始终指定消费者组。在扩展 Spring Cloud Stream 应用程序时,必须为其每个输入绑定指定一个消费者组。这样做可以防止应用程序的实例接收重复的消息(除非需要这种行为,这很不常见)。

分区支持

Spring Cloud Stream 提供了在给定应用程序的多个实例之间对数据进行分区的功能。在分区场景中,物理通信介质(例如代理主题)被视为结构化为多个分区。一个或多个生产者应用程序实例将数据发送到多个消费者应用程序实例,并确保具有共同特征标识的数据由同一个消费者实例处理。

Spring Cloud Stream 为以统一的方式实现分区处理用例提供了一个通用抽象。因此,无论代理本身是否自然分区(例如 Kafka)或不分区(例如 RabbitMQ),都可以使用分区。

SCSt partitioning
图 4. Spring Cloud Stream 分区

分区是在有状态处理中一个关键的概念,在有状态处理中,确保所有相关数据一起处理至关重要(出于性能或一致性原因)。例如,在基于时间窗口的平均值计算示例中,重要的是确保来自任何给定传感器的所有测量值都由同一个应用程序实例处理。

要设置分区处理场景,必须配置数据生产端和数据消费端。

编程模型

要了解编程模型,您应该熟悉以下核心概念

  • 目标绑定程序:负责提供与外部消息传递系统集成的组件。

  • 绑定:外部消息传递系统与应用程序提供的消息生产者消费者之间的桥梁(由目标绑定程序创建)。

  • 消息:生产者和消费者用于与目标绑定程序(以及通过外部消息传递系统与其他应用程序)通信的规范数据结构。

SCSt overview

目标绑定程序

目标绑定程序是 Spring Cloud Stream 的扩展组件,负责提供必要的配置和实现以促进与外部消息传递系统的集成。此集成负责生产者和消费者之间消息的连接、委托和路由、数据类型转换、用户代码的调用等等。

绑定程序处理了许多原本需要您承担的样板责任。但是,为了实现这一点,绑定程序仍然需要用户提供一些最少但必要的指令,这些指令通常以某种类型的绑定配置的形式出现。

虽然在本节中讨论所有可用的绑定程序和绑定配置选项超出了范围(手册的其余部分广泛涵盖了这些选项),但绑定作为一个概念,确实需要特别注意。下一节将详细讨论它。

绑定

如前所述,绑定在外部消息传递系统(例如,队列、主题等)和应用程序提供的生产者消费者之间架起了一座桥梁。

以下示例显示了一个完全配置且功能正常的 Spring Cloud Stream 应用程序,该应用程序接收消息的有效负载作为String类型(请参阅内容类型协商部分),将其记录到控制台,并在将其转换为大写后将其发送到下游。

@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class, args);
	}

	@Bean
	public Function<String, String> uppercase() {
	    return value -> {
	        System.out.println("Received: " + value);
	        return value.toUpperCase();
	    };
	}
}

以上示例与任何普通 spring-boot 应用程序看起来没有区别。它定义了一个类型为Function的单个 bean,仅此而已。那么,它如何成为一个 spring-cloud-stream 应用程序呢?它之所以成为一个 spring-cloud-stream 应用程序,仅仅是因为类路径上存在 spring-cloud-stream 和绑定程序依赖项以及自动配置类,有效地将引导应用程序的上下文设置为 spring-cloud-stream 应用程序。在这种上下文中,类型为SupplierFunctionConsumer的 bean 被视为事实上的消息处理程序,触发绑定到由提供的绑定程序公开的目标,遵循某些命名约定和规则以避免额外的配置。

绑定和绑定名称

绑定是一个抽象概念,表示绑定程序和用户代码公开的源和目标之间的桥梁,此抽象有一个名称,虽然我们尽最大努力限制运行 spring-cloud-stream 应用程序所需的配置,但在需要每个绑定额外配置的情况下,了解此类名称是必要的。

在本手册中,您将看到配置属性的示例,例如spring.cloud.stream.bindings.input.destination=myQueue。此属性名称中的input段是我们所说的绑定名称,它可以通过多种机制派生。以下小节将描述 spring-cloud-stream 用于控制绑定名称的命名约定和配置元素。

如果您的绑定名称包含特殊字符,例如.字符,则需要用括号([])括起绑定键,然后用引号括起来。例如spring.cloud.stream.bindings."[my.output.binding.key]".destination
功能性绑定名称

与以前版本的 spring-cloud-stream 中使用的基于注释的支持(传统)所需的显式命名不同,功能编程模型在绑定名称方面默认为简单的约定,从而大大简化了应用程序配置。让我们看看第一个示例

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
	    return value -> value.toUpperCase();
	}
}

在前面的示例中,我们有一个应用程序,其中包含一个充当消息处理程序的单个函数。作为一个Function,它有一个输入和一个输出。用于命名输入和输出绑定的命名约定如下

  • 输入 - <functionName> + -in- + <index>

  • 输出 - <functionName> + -out- + <index>

inout对应于绑定的类型(例如输入输出)。index是输入或输出绑定的索引。对于典型的单输入/输出函数,它始终为 0,因此它仅与具有多个输入和输出参数的函数相关。

因此,例如,如果要将此函数的输入映射到名为“my-topic”的远程目标(例如,主题、队列等),则可以使用以下属性

--spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic

请注意如何使用uppercase-in-0作为属性名称中的一个段。对于uppercase-out-0也是如此。

描述性绑定名称

有时为了提高可读性,您可能希望为您的绑定指定一个更具描述性的名称(例如“account”、“orders”等)。另一种看待它的方式是,您可以将隐式绑定名称映射到显式绑定名称。您可以使用spring.cloud.stream.function.bindings.<binding-name>属性来实现。此属性还为依赖于需要显式名称的自定义基于接口的绑定的现有应用程序提供了一条迁移路径。

例如,

--spring.cloud.stream.function.bindings.uppercase-in-0=input

在前面的示例中,您映射并有效地将uppercase-in-0绑定名称重命名为input。现在所有配置属性都可以引用input绑定名称(例如,--spring.cloud.stream.bindings.input.destination=my-topic)。

虽然描述性绑定名称可以增强配置的可读性方面,但它们也通过将隐式绑定名称映射到显式绑定名称创建了另一层误导。并且由于所有后续配置属性都将使用显式绑定名称,因此您必须始终参考此“bindings”属性以关联它实际对应的函数。我们认为,对于大多数情况(除了功能组合之外),这可能有点过头,因此,我们建议完全避免使用它,尤其是在不使用它时,绑定程序目标和绑定名称之间存在清晰的路径,例如spring.cloud.stream.bindings.uppercase-in-0.destination=sample-topic,其中您清楚地将uppercase函数的输入与sample-topic目标相关联。

有关属性和其他配置选项的更多信息,请参阅配置选项部分。

显式绑定创建

在上一节中,我们解释了如何通过应用程序提供的FunctionSupplierConsumer bean的名称隐式创建绑定。但是,有时您可能需要显式创建绑定,在这些绑定中,绑定不与任何函数绑定。这通常是为了通过StreamBridge支持与其他框架的集成。

Spring Cloud Stream 允许您通过spring.cloud.stream.input-bindingsspring.cloud.stream.output-bindings属性显式定义输入和输出绑定。请注意属性名称中的复数,允许您通过简单地使用;作为分隔符来定义多个绑定。只需查看以下测试用例即可作为示例

@Test
public void testExplicitBindings() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
		TestChannelBinderConfiguration.getCompleteConfiguration(EmptyConfiguration.class))
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false",
					"--spring.cloud.stream.input-bindings=fooin;barin",
					"--spring.cloud.stream.output-bindings=fooout;barout")) {


	. . .
	}
}

@EnableAutoConfiguration
@Configuration
public static class EmptyConfiguration {
}

如您所见,我们声明了两个输入绑定和两个输出绑定,而我们的配置没有定义任何函数,但我们仍然能够成功创建这些绑定并访问其相应的通道。

生产和消费消息

您可以通过简单地编写函数并将它们公开为@Bean来编写 Spring Cloud Stream 应用程序。您还可以使用基于 Spring Integration 注释的配置或基于 Spring Cloud Stream 注释的配置,尽管从 spring-cloud-stream 3.x 开始,我们建议使用功能实现。

Spring Cloud Function 支持

概述

从 Spring Cloud Stream v2.1 开始,定义流处理程序的另一种替代方法是使用对Spring Cloud Function的内置支持,其中它们可以表示为类型为java.util.function.[Supplier/Function/Consumer]的 bean。

要指定要绑定到绑定公开的外部目标的哪个功能 bean,必须提供spring.cloud.function.definition属性。

如果您只有一个类型为java.util.function.[Supplier/Function/Consumer]的 Bean,您可以跳过spring.cloud.function.definition属性,因为此类功能 Bean 将自动发现。但是,建议使用此属性以避免任何混淆。有时这种自动发现可能会造成干扰,因为类型为java.util.function.[Supplier/Function/Consumer]的单个 Bean 可能存在于处理消息之外的其他目的,但由于它是单个 Bean,因此会被自动发现和自动绑定。对于这些罕见的情况,您可以通过提供spring.cloud.stream.function.autodetect属性并将其值设置为false来禁用自动发现。

以下是一个应用程序示例,它将消息处理器公开为java.util.function.Function,通过充当数据消费者和生产者,有效地支持直通语义。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在前面的示例中,我们定义了一个名为toUpperCase的类型为java.util.function.Function的 Bean,该 Bean 充当消息处理器,其“输入”和“输出”必须绑定到提供的目标绑定程序公开的外部目标。默认情况下,“输入”和“输出”绑定的名称将为toUpperCase-in-0toUpperCase-out-0。有关用于建立绑定名称的命名约定的详细信息,请参阅功能绑定名称部分。

以下是支持其他语义的简单功能应用程序示例。

以下是一个语义示例,它公开为java.util.function.Supplier

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

以下是一个接收器语义示例,它公开为java.util.function.Consumer

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}
供应商 (源)

在触发方式方面,FunctionConsumer非常简单明了。它们基于发送到其绑定到的目标的数据(事件)触发。换句话说,它们是经典的事件驱动组件。

但是,Supplier在触发方面属于其自己的类别。根据定义,它是数据的源头(起点),它不订阅任何入站目标,因此必须由其他机制触发。还有一个Supplier实现的问题,它可能是命令式反应式的,这直接关系到此类供应商的触发。

考虑以下示例。

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

前面的Supplier Bean在其get()方法被调用时生成一个字符串。但是,谁调用此方法以及多久调用一次?框架提供了一个默认的轮询机制(回答“谁?”的问题),它将触发供应商的调用,默认情况下,它将每秒执行一次(回答“多久?”的问题)。换句话说,上述配置每秒生成一条消息,并且每条消息都发送到绑定程序公开的output目标。要了解如何自定义轮询机制,请参阅轮询配置属性部分。

考虑另一个示例。

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

前面的Supplier Bean采用了反应式编程风格。通常,与命令式供应商不同,它应该只触发一次,因为其get()方法的调用会产生(提供)持续的消息流,而不是单个消息。

框架识别编程风格的差异,并保证此类供应商仅触发一次。

但是,想象一下您想要轮询某些数据源并返回表示结果集的有限数据流的用例。反应式编程风格是此类供应商的完美机制。但是,鉴于所生成流的有限性质,此类供应商仍然需要定期调用。

考虑以下示例,它通过生成有限的数据流来模拟此类用例。

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

Bean 本身使用PollableBean注解(@Bean的子集)进行注解,从而向框架发出信号,表明尽管此类供应商的实现是反应式的,但它仍然需要轮询。

PollableBean中定义了一个splittable属性,它向此注解的后处理器发出信号,表明注解组件产生的结果必须被拆分,并且默认设置为true。这意味着框架将拆分返回结果,并将每个项目作为单个消息发送出去。如果这不是所需的行为,您可以将其设置为false,此时此类供应商将简单地返回生成的 Flux 而不进行拆分。
Supplier & 线程
正如您现在所了解的,与由事件触发的FunctionConsumer(它们有输入数据)不同,Supplier没有任何输入,因此由不同的机制——轮询器触发,该机制可能具有不可预测的线程机制。虽然线程机制的细节在大多数情况下与函数的下游执行无关,但在某些情况下可能会出现问题,尤其是在具有某些线程亲和性期望的集成框架中。例如,Spring Cloud Sleuth依赖于存储在线程本地中的跟踪数据。对于这些情况,我们还有另一种机制通过StreamBridge,用户可以更好地控制线程机制。您可以在将任意数据发送到输出(例如,外部事件驱动源)部分中获得更多详细信息。
Consumer (反应式)

反应式Consumer有点特殊,因为它具有 void 返回类型,框架没有可订阅的引用。您很可能不需要编写Consumer<Flux<?>>,而是将其编写为Function<Flux<?>, Mono<Void>>,在流上调用then操作符作为最后一个操作符。

例如。

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但是,如果您确实需要编写显式的Consumer<Flux<?>>,请记住订阅传入的 Flux。

此外,请记住,当混合使用反应式函数和命令式函数时,相同的规则适用于函数组合。Spring Cloud Function确实支持将反应式函数与命令式函数组合,但是您必须注意某些限制。例如,假设您将反应式函数与命令式 Consumer 组合。此类组合的结果是反应式Consumer。但是,如本节前面所述,无法订阅此类 Consumer,因此此限制只能通过以下两种方式解决:要么使您的 Consumer 成为反应式的并手动订阅(如前面所述),要么将您的函数更改为命令式的。

轮询配置属性

以下属性由 Spring Cloud Stream 公开,并以spring.integration.poller.为前缀。

fixedDelay

默认轮询器的固定延迟(以毫秒为单位)。

默认值:1000L。

maxMessagesPerPoll

默认轮询器每次轮询事件的最大消息数。

默认值:1L。

cron

Cron 触发器的 Cron 表达式值。

默认值:无。

initialDelay

周期性触发器的初始延迟。

默认值:0。

timeUnit

应用于延迟值的 TimeUnit。

默认值:MILLISECONDS。

例如,--spring.integration.poller.fixed-delay=2000将轮询间隔设置为每两秒轮询一次。

每个绑定的轮询配置

上一节显示了如何配置一个将应用于所有绑定的单个默认轮询器。虽然它非常适合 Spring Cloud Stream 为其设计的微服务模型,其中每个微服务代表一个组件(例如,Supplier),因此默认轮询器配置就足够了,但也有一些边缘情况,您可能有多个组件需要不同的轮询配置。

对于此类情况,请使用每个绑定的方式配置轮询器。例如,假设您有一个输出绑定supply-out-0。在这种情况下,您可以使用spring.cloud.stream.bindings.supply-out-0.producer.poller..前缀(例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)为该绑定配置轮询器。

将任意数据发送到输出(例如,外部事件驱动源)

在某些情况下,数据的实际来源可能来自外部(外部)系统,该系统不是绑定程序。例如,数据源可能是经典的 REST 端点。我们如何将此类源与 Spring Cloud Stream 使用的功能机制桥接?

Spring Cloud Stream 提供了两种机制,让我们更详细地了解一下它们。

在这里,对于这两个示例,我们将使用一个名为delegateToSupplier的标准 MVC 端点方法,该方法绑定到根 Web 上下文,并通过 StreamBridge 机制将传入的请求委托到流。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

在这里,我们自动装配了一个StreamBridge Bean,它允许我们将数据发送到输出绑定,有效地将非流应用程序与 Spring Cloud Stream 桥接。请注意,前面的示例没有定义任何源函数(例如,Supplier Bean),因此框架没有触发器来预先创建源绑定,这在配置包含函数 Bean 的情况下很常见。这没有问题,因为StreamBridge将在首次调用其send(..)操作时启动输出绑定的创建(以及必要的目标自动预配),并在后续重用时进行缓存(有关更多详细信息,请参阅StreamBridge 和动态目标)。

但是,如果您想在初始化(启动)时预创建输出绑定,则可以利用spring.cloud.stream.output-bindings属性,您可以在其中声明源的名称。提供的名称将用作创建源绑定的触发器。您可以使用;来表示多个源(多个输出绑定)(例如,--spring.cloud.stream.output-bindings=foo;bar)。

此外,请注意,streamBridge.send(..)方法采用数据作为Object。这意味着您可以向其发送 POJO 或Message,它将与从任何 Function 或 Supplier 发送输出时一样,经过相同的例程,提供与函数相同的级别的一致性。这意味着输出类型转换、分区等将像从函数生成的输出一样得到尊重。

StreamBridge 和动态目标

StreamBridge 也可用于输出目标事先未知的情况,类似于来自消费者的路由部分中描述的用例。

让我们来看一个示例

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

如您所见,前面的示例与上一个示例非常相似,除了通过spring.cloud.stream.output-bindings属性(未提供)提供的显式绑定指令。在这里,我们正在发送数据到名为myDestination的目标,该目标不存在作为绑定。因此,此名称将被视为动态目标,如来自消费者的路由部分所述。

在前面的示例中,我们使用ApplicationRunner作为外部源来为流提供数据。

一个更实际的示例,其中外部源是 REST 端点。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

如您所见,在delegateToSupplier方法内部,我们使用 StreamBridge 将数据发送到myBinding绑定。在这里,您也受益于StreamBridge的动态特性,如果myBinding不存在,它将被自动创建并缓存,否则将使用现有的绑定。

缓存动态目标(绑定)可能会导致内存泄漏,如果存在许多动态目标。为了进行一定程度的控制,我们为输出绑定提供了一个自清除缓存机制,默认缓存大小为 10。这意味着,如果您的动态目标大小超过该数量,则可能存在驱逐现有绑定的可能性,因此需要重新创建它,这可能会导致轻微的性能下降。您可以通过spring.cloud.stream.dynamic-destination-cache-size属性将它设置为所需的值来增加缓存大小。
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" https://127.0.0.1:8080/

通过展示两个示例,我们想要强调这种方法适用于任何类型的外部源。

如果您使用的是 Solace PubSub+ 绑定器,Spring Cloud Stream 预留了scst_targetDestination标头(可通过 BinderHeaders.TARGET_DESTINATION 获取),它允许将消息从其绑定配置的目标重定向到此标头指定的 targetDestination。这允许绑定器管理发布到动态目标所需的资源,从而减轻框架执行此操作的负担,并避免前面备注中提到的缓存问题。更多信息请点击此处
StreamBridge 的输出内容类型

如果需要,您还可以使用以下方法签名public boolean send(String bindingName, Object data, MimeType outputContentType)提供特定的内容类型。或者,如果您将数据作为Message发送,则将使用其内容类型。

使用 StreamBridge 的特定绑定器类型

Spring Cloud Stream 支持多种绑定器场景。例如,您可能正在从 Kafka 接收数据并将其发送到 RabbitMQ。

有关多个绑定器场景的更多信息,请参阅绑定器部分,尤其是类路径上的多个绑定器

如果您计划使用 StreamBridge 并在应用程序中配置了多个绑定器,则还必须告诉 StreamBridge 使用哪个绑定器。为此,send方法还有另外两种变体

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

如您所见,您可以提供一个额外的参数 - binderType,告诉 BindingService 在创建动态绑定时使用哪个绑定器。

对于使用spring.cloud.stream.output-bindings属性或绑定已在其他绑定器下创建的情况,binderType参数将不起作用。
使用 StreamBridge 的通道拦截器

由于StreamBridge使用MessageChannel来建立输出绑定,因此您可以在通过StreamBridge发送数据时激活通道拦截器。应用程序需要决定在StreamBridge上应用哪些通道拦截器。Spring Cloud Stream 不会将检测到的所有通道拦截器注入到StreamBridge中,除非它们用@GlobalChannelInterceptor(patterns = "*")进行了注解。

假设您在应用程序中具有以下两个不同的StreamBridge绑定。

streamBridge.send("foo-out-0", message);

以及

streamBridge.send("bar-out-0", message);

现在,如果您希望在两个StreamBridge绑定上都应用通道拦截器,则可以声明以下GlobalChannelInterceptorbean。

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

但是,如果您不喜欢上面的全局方法,并希望为每个绑定都有一个专用的拦截器,则可以执行以下操作。

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

以及

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

您可以灵活地使模式更严格或根据您的业务需求进行自定义。

使用这种方法,应用程序能够决定将哪些拦截器注入到StreamBridge中,而不是应用所有可用的拦截器。

StreamBridge通过StreamOperations接口提供了一个契约,其中包含StreamBridge的所有send方法。因此,应用程序可以选择使用StreamOperations进行自动装配。当需要对使用StreamBridge的代码进行单元测试时,这非常方便,因为它可以为StreamOperations接口提供模拟或类似机制。
响应式函数支持

由于Spring Cloud Function构建在Project Reactor之上,因此在实现SupplierFunctionConsumer时,您无需执行太多操作即可受益于响应式编程模型。

例如。

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

在选择响应式或命令式编程模型时,必须了解一些重要事项。

完全响应式还是仅 API?

使用响应式 API 不一定意味着您可以从该 API 的所有响应式功能中受益。换句话说,诸如背压和其他高级功能之类的东西仅在与兼容的系统(例如 Reactive Kafka 绑定器)一起使用时才能发挥作用。如果您使用的是常规 Kafka 或 Rabbit 或任何其他非响应式绑定器,则只能从响应式 API 本身带来的便利中受益,而不能从其高级功能中受益,因为流的实际源或目标不是响应式的。

错误处理和重试

在本手册中,您将看到关于基于框架的错误处理、重试和其他功能以及与其关联的配置属性的若干参考。必须了解,它们仅影响命令式函数,在处理响应式函数时,您不应具有相同的期望。原因如下……响应式函数和命令式函数之间存在根本区别。命令式函数是消息处理程序,框架在收到每条消息时都会调用它。因此,对于 N 条消息,将调用该函数 N 次,因此我们可以包装该函数并添加其他功能,例如错误处理、重试等。响应式函数是初始化函数。它仅调用一次以获取对用户提供的 Flux/Mono 的引用,以便将其与框架提供的引用连接起来。之后,我们(框架)对流没有任何可见性或控制权。因此,对于响应式函数,在处理错误和重试时,您必须依赖响应式 API 的丰富性(即,doOnError().onError*()等)。

函数组合

使用函数式编程模型,您还可以受益于函数组合,您可以在其中从一组简单的函数动态地组合复杂的处理程序。例如,让我们将以下函数 bean 添加到上面定义的应用程序中

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

并修改spring.cloud.function.definition属性以反映您希望从“toUpperCase”和“wrapInQuotes”组合新函数的意图。为此,Spring Cloud Function 依赖于|(管道)符号。因此,为了完成我们的示例,我们的属性现在将如下所示

--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function提供的函数组合支持的一大好处是您可以组合响应式命令式函数。

组合的结果是一个单一函数,正如您可能猜到的那样,它可能具有非常长且相当隐晦的名称(例如,foo|bar|baz|xyz. . .),在处理其他配置属性时会造成很大的不便。这就是函数绑定名称部分中描述的描述性绑定名称功能可以提供帮助的地方。

例如,如果我们想为我们的toUpperCase|wrapInQuotes提供一个更具描述性的名称,我们可以使用以下属性spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput,允许其他配置属性引用该绑定名称(例如,spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。

函数组合和横切关注点

函数组合有效地允许您通过将其分解为一组简单且可单独管理/测试的组件来解决复杂性,这些组件仍然可以在运行时表示为一个。但这并不是唯一的好处。

您还可以使用组合来解决某些横切非功能性问题,例如内容丰富。例如,假设您有一个传入的消息,该消息可能缺少某些标头,或者某些标头未处于您的业务函数期望的确切状态。您现在可以实现一个单独的函数来解决这些问题,然后将其与主业务函数组合。

让我们来看一个示例

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

虽然很简单,但此示例演示了一个函数如何使用其他标头(非功能性问题)丰富传入的消息,以便其他函数 - echo - 可以从中受益。echo函数保持简洁并仅专注于业务逻辑。您还可以看到spring.cloud.stream.function.bindings属性的使用,以简化组合的绑定名称。

具有多个输入和输出参数的函数

从 3.0 版开始,spring-cloud-stream 提供了对具有多个输入和/或多个输出(返回值)的函数的支持。这到底意味着什么,它针对哪类用例?

  • 大数据:想象一下您正在处理的数据源高度无组织且包含各种类型的数据元素(例如,订单、交易等),您需要有效地对其进行分类。

  • 数据聚合:另一个用例可能需要您合并来自 2 个或更多传入_流_的数据元素

    .

上面描述的只是几个可能需要使用单个函数来接受和/或生成多个数据的用例。而这正是我们在这里针对的用例类型。

另外,请注意这里对概念的略微不同的强调。假设只有当这些函数能够访问实际的数据流(而不是单个元素)时,它们才有价值。因此,我们依赖于Project Reactor(即FluxMono)提供的抽象,这些抽象作为spring-cloud-functions引入的依赖项的一部分,已经在类路径上可用。

另一个重要的方面是多个输入和输出的表示。虽然java提供了各种不同的抽象来表示多个某物,但这些抽象a) 是无界的b) 缺乏元数c) 缺乏类型信息,而这些在本文中都很重要。例如,让我们看看Collection或数组,它们只允许我们描述单个类型的多个或将所有内容提升为Object,从而影响spring-cloud-stream的透明类型转换功能等等。

因此,为了满足所有这些需求,初始支持依赖于利用Project Reactor提供的另一个抽象——元组的签名。但是,我们正在努力允许更灵活的签名。

请参阅绑定和绑定名称部分,以了解用于建立此类应用程序使用的绑定名称的命名约定。

让我们看看几个示例

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

上面的示例演示了一个函数,它接收两个输入(第一个类型为String,第二个类型为Integer)并生成一个类型为String的输出。

因此,对于上面的示例,两个输入绑定将为gather-in-0gather-in-1,为了保持一致性,输出绑定也遵循相同的约定,并命名为gather-out-0

了解这一点将允许您设置绑定特定的属性。例如,以下内容将覆盖gather-in-0绑定的内容类型

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

上面的示例与前面的示例有些相反,它演示了一个函数,该函数接收一个类型为Integer的输入并生成两个输出(两者类型均为String)。

因此,对于上面的示例,输入绑定为scatter-in-0,输出绑定为scatter-out-0scatter-out-1

您可以使用以下代码进行测试

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}
单个应用程序中的多个函数

可能还需要在一个应用程序中对多个消息处理程序进行分组。您可以通过定义多个函数来做到这一点。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

在上面的示例中,我们有一个配置,它定义了两个函数uppercasereverse。因此,首先,如前所述,我们需要注意到存在冲突(多个函数),因此我们需要通过提供spring.cloud.function.definition属性来解决它,该属性指向我们想要绑定的实际函数。除了这里我们将使用;分隔符来指向这两个函数(请参阅下面的测试用例)。

与具有多个输入/输出的函数一样,请参阅绑定和绑定名称部分,以了解用于建立此类应用程序使用的绑定名称的命名约定。

您可以使用以下代码进行测试

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-1");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}
批处理消费者

当使用支持批处理侦听器的MessageChannelBinder,并且为消费者绑定启用了该功能时,您可以将spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode设置为true以启用将整个消息批次传递给函数中List

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}
批处理生产者

您还可以通过返回消息集合在生产者端使用批处理的概念,这有效地提供了相反的效果,其中集合中的每条消息都将由绑定器单独发送。

考虑以下功能

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

返回列表中的每条消息都将单独发送,导致四条消息发送到输出目标。

Spring Integration 流作为函数

在实现函数时,您可能有一些复杂的需求适合企业集成模式(EIP)的类别。这些最好使用诸如Spring Integration(SI)之类的框架来处理,该框架是EIP的参考实现。

值得庆幸的是,SI已经提供了通过集成流作为网关将集成流公开为函数的支持。请考虑以下示例

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlows.from(MessageFunction.class, "uppercase")
				.<String, String>transform(String::toUpperCase)
				.logAndReply(LoggingHandler.Level.WARN);
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

对于熟悉SI的人,您可以看到我们定义了一个类型为IntegrationFlow的bean,在其中我们声明了一个我们想要公开为Function<String, String>(使用SI DSL)的集成流,称为uppercaseMessageFunction接口允许我们明确声明输入和输出的类型,以进行正确的类型转换。有关类型转换的更多信息,请参阅内容类型协商部分。

要接收原始输入,您可以使用from(Function.class, …​)

生成的函数绑定到目标绑定器公开的输入和输出目标。

请参阅绑定和绑定名称部分,以了解用于建立此类应用程序使用的绑定名称的命名约定。

有关 Spring Integration 和 Spring Cloud Stream 的互操作性的更多详细信息,尤其是在功能编程模型周围,您可能会发现这篇文章非常有趣,因为它更深入地探讨了您可以通过合并 Spring Integration 和 Spring Cloud Stream/Functions 的最佳功能来应用的各种模式。

使用轮询消费者

概述

使用轮询消费者时,您可以按需轮询PollableMessageSource。要为轮询消费者定义绑定,您需要提供spring.cloud.stream.pollable-source属性。

请考虑以下轮询消费者绑定的示例

--spring.cloud.stream.pollable-source=myDestination

前面示例中的轮询源名称myDestination将导致myDestination-in-0绑定名称保持与功能编程模型一致。

鉴于前面示例中的轮询消费者,您可以按如下方式使用它

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

一个不太手动且更符合 Spring 风格的替代方案是配置一个计划任务 bean。例如,

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

PollableMessageSource.poll()方法采用MessageHandler参数(通常是 lambda 表达式,如这里所示)。如果消息已接收并成功处理,则返回true

与消息驱动的消费者一样,如果MessageHandler抛出异常,则消息将发布到错误通道,如错误处理中所述。

通常,poll()方法在MessageHandler退出时确认消息。如果方法异常退出,则消息将被拒绝(不会重新入队),但请参阅处理错误。您可以通过承担确认责任来覆盖此行为,如以下示例所示

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
您必须在某个时刻ack(或nack)消息,以避免资源泄漏。
某些消息传递系统(例如 Apache Kafka)在日志中维护一个简单的偏移量。如果传递失败并使用StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE);重新入队,则任何以后成功确认的消息都会重新传递。

还有一个重载的poll方法,其定义如下

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type是一个转换提示,允许转换传入的消息有效负载,如以下示例所示

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});
处理错误

默认情况下,为可轮询源配置了一个错误通道;如果回调抛出异常,则ErrorMessage将发送到错误通道(<destination>.<group>.errors);此错误通道也桥接到全局 Spring Integration errorChannel

您可以使用@ServiceActivator订阅任一错误通道以处理错误;如果没有订阅,则错误将简单地记录,并且消息将被确认成功。如果错误通道服务激活器抛出异常,则消息将被拒绝(默认情况下)并且不会重新传递。如果服务激活器抛出RequeueCurrentMessageException,则消息将重新入队到代理,并在后续轮询中再次检索。

如果侦听器直接抛出RequeueCurrentMessageException,则消息将重新入队,如上所述,并且不会发送到错误通道。

事件路由

在 Spring Cloud Stream 的上下文中,事件路由是指能够a) 将事件路由到特定事件订阅者b) 将事件订阅者生成的事件路由到特定目标。在这里,我们将称之为路由“到”和路由“从”。

路由到消费者

路由可以通过依赖 Spring Cloud Function 3.0 中可用的RoutingFunction来实现。您需要做的就是通过--spring.cloud.stream.function.routing.enabled=true应用程序属性启用它或提供spring.cloud.function.routing-expression属性。启用后,RoutingFunction将绑定到接收所有消息的输入目标,并根据提供的指令将其路由到其他函数。

为了绑定目的,路由目标的名称为functionRouter-in-0(请参阅 RoutingFunction.FUNCTION_NAME 和绑定命名约定功能绑定名称)。

指令可以与单个消息以及应用程序属性一起提供。

以下是一些示例

使用消息头
@SpringBootApplication
public class SampleApplication {

	public static void main(String[] args) {
		SpringApplication.run(SampleApplication.class,
                       "--spring.cloud.stream.function.routing.enabled=true");
	}

	@Bean
	public Consumer<String> even() {
		return value -> {
			System.out.println("EVEN: " + value);
		};
	}

	@Bean
	public Consumer<String> odd() {
		return value -> {
			System.out.println("ODD: " + value);
		};
    }
}

通过将消息发送到绑定器(即 rabbit、kafka)公开的functionRouter-in-0目标,此类消息将路由到相应的(“偶数”或“奇数”)消费者。

默认情况下,RoutingFunction将查找spring.cloud.function.definitionspring.cloud.function.routing-expression(对于使用 SpEL 的更动态的场景)标头,如果找到,其值将被视为路由指令。

例如,将spring.cloud.function.routing-expression标头设置为值T(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'将最终将请求半随机地路由到oddeven函数。此外,对于 SpEL,评估上下文的根对象Message,因此您也可以对各个标头(或消息)进行评估…​.routing-expression=headers['type']

使用应用程序属性

spring.cloud.function.routing-expression和/或spring.cloud.function.definition可以作为应用程序属性传递(例如,spring.cloud.function.routing-expression=headers['type'])。

@SpringBootApplication
public class RoutingStreamApplication {

  public static void main(String[] args) {
      SpringApplication.run(RoutingStreamApplication.class,
	  "--spring.cloud.function.routing-expression="
	  + "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
  }
  @Bean
  public Consumer<Integer> even() {
    return value -> System.out.println("EVEN: " + value);
  }

  @Bean
  public Consumer<Integer> odd() {
    return value -> System.out.println("ODD: " + value);
  }
}
通过应用程序属性传递指令对于反应式函数尤其重要,因为反应式函数仅调用一次以传递发布者,因此访问单个项目的权限有限。
路由函数和输出绑定

RoutingFunction是一个Function,因此与任何其他函数的处理方式相同。嗯...几乎一样。

RoutingFunction路由到另一个Function时,其输出将发送到RoutingFunction的输出绑定,即functionRouter-in-0,这符合预期。但是,如果RoutingFunction路由到Consumer会怎样?换句话说,RoutingFunction调用的结果可能不会生成任何内容发送到输出绑定,因此甚至不需要输出绑定。因此,在创建绑定时,我们确实对RoutingFunction进行了稍微不同的处理。即使这对您作为用户来说是透明的(您确实没有任何需要做的事情),了解一些机制将有助于您理解其内部工作原理。

因此,规则是:我们永远不会为RoutingFunction创建输出绑定,只创建输入绑定。所以当您路由到Consumer时,RoutingFunction由于没有输出绑定,实际上就变成了一个Consumer。但是,如果RoutingFunction恰好路由到另一个产生输出的Function,则会动态创建RoutingFunction的输出绑定,此时RoutingFunction在绑定方面将充当常规的Function(同时具有输入和输出绑定)。

从 Consumer 路由

除了静态目标外,Spring Cloud Stream 允许应用程序将消息发送到动态绑定的目标。例如,当需要在运行时确定目标目标时,这很有用。应用程序可以通过两种方式之一来做到这一点。

spring.cloud.stream.sendto.destination

您还可以将委托给框架,以通过指定spring.cloud.stream.sendto.destination标头(设置为要解析的目标的名称)来动态解析输出目标。

考虑以下示例

@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {

    @Bean
	public Function<String, Message<String>> destinationAsPayload() {
		return value -> {
			return MessageBuilder.withPayload(value)
				.setHeader("spring.cloud.stream.sendto.destination", value).build();};
	}
}

虽然很简单,但在这个例子中您可以清楚地看到,我们的输出是一条包含spring.cloud.stream.sendto.destination标头的消息,该标头的值设置为输入参数的值。框架将查阅此标头,并尝试创建或发现具有该名称的目标,并将输出发送到该目标。

如果目标名称事先已知,则可以像任何其他目标一样配置生产者属性。或者,如果您注册了一个NewDestinationBindingCallback<> bean,则会在创建绑定之前调用它。回调采用绑定器使用的扩展生产者属性的泛型类型。它有一个方法

void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
        T extendedProducerProperties);

以下示例显示了如何使用 RabbitMQ 绑定器

@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
    return (name, channel, props, extended) -> {
        props.setRequiredGroups("bindThisQueue");
        extended.setQueueNameGroupOnly(true);
        extended.setAutoBindDlq(true);
        extended.setDeadLetterQueueName("myDLQ");
    };
}
如果您需要支持具有多种绑定器类型的动态目标,请对泛型类型使用Object,并根据需要将extended参数强制转换为所需类型。

此外,请参阅[使用 StreamBridge]部分,了解如何将另一个选项(StreamBridge)用于类似情况。

后处理(发送消息后)

一旦函数被调用,其结果将由框架发送到目标目的地,这有效地完成了函数调用周期。

但是,从业务角度来看,在执行此周期之后执行一些其他任务之前,此周期可能尚未完全完成。虽然这可以通过ConsumerStreamBridge的简单组合来实现,如本Stack Overflow 帖子中所述,但从 4.0.3 版本开始,框架提供了一种更惯用的方法来解决此问题,即通过 Spring Cloud Function 项目提供的PostProcessingFunctionPostProcessingFunction是一个特殊的半标记函数,它包含一个额外的postProcess(Message>)方法,旨在提供一个用于实现此类后处理任务的地方。

package org.springframework.cloud.function.context
. . .
public interface PostProcessingFunction<I, O> extends Function<I, O> {
	default void postProcess(Message<O> result) {
	}
}

所以,现在您有两个选择。

选项 1:您可以将您的函数实现为PostProcessingFunction,并通过实现其postProcess(Message>)方法来包含额外的后处理行为。

private static class Uppercase implements PostProcessingFunction<String, String> {

	@Override
	public String apply(String input) {
		return input.toUpperCase();
	}

	@Override
	public void postProcess(Message<String> result) {
		System.out.println("Function Uppercase has been successfully invoked and its result successfully sent to target destination");
	}
}
. . .
@Bean
public Function<String, String> uppercase() {
	return new Uppercase();
}

选项 2:如果您已经有一个现有的函数,并且不想更改其实现或希望将您的函数保留为 POJO,则只需实现postProcess(Message>)方法,并将此新的后处理函数与您的其他函数组合即可。

private static class Logger implements PostProcessingFunction<?, String> {

	@Override
	public void postProcess(Message<String> result) {
		System.out.println("Function has been successfully invoked and its result successfully sent to target destination");
	}
}
. . .
@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}
@Bean
public Function<String, String> logger() {
	return new Logger();
}
. . .
//  and then have your function definition as such `uppercase|logger`

注意:在函数组合的情况下,只有最后一个PostProcessingFunction实例(如果存在)才会生效。例如,假设您有以下函数定义 - foo|bar|baz,并且foobaz都是PostProcessingFunction的实例。只有baz.postProcess(Message>)会被调用。如果baz不是PostProcessingFunction的实例,则不会执行任何后处理功能。

有人可能会争辩说,您可以通过简单地将后处理器作为另一个Function进行组合来轻松地做到这一点。这确实是一种可能性,但是,在这种情况下,后处理功能将在调用前一个函数之后且在消息发送到目标目的地之前被调用,即在函数调用周期完成之前。

错误处理

在本节中,我们将解释框架提供的错误处理机制背后的总体思路。我们将使用 Rabbit 绑定器作为示例,因为各个绑定器为某些特定于底层代理功能(例如 Kafka 绑定器)的支持机制定义了不同的属性集。

错误会发生,Spring Cloud Stream 提供了几种灵活的机制来处理它们。请注意,这些技术取决于绑定器实现和底层消息传递中间件的功能以及编程模型(稍后将详细介绍)。

每当消息处理程序(函数)抛出异常时,它都会传播回绑定器,此时绑定器将使用Spring Retry库提供的RetryTemplate尝试多次重试同一消息(默认情况下为 3 次)。如果重试不成功,则取决于错误处理机制,该机制可能会丢弃消息、重新排队消息以进行重新处理或将失败的消息发送到 DLQ

Rabbit 和 Kafka 都支持这些概念(尤其是 DLQ)。但是,其他绑定器可能不支持,因此请参阅您各自绑定器的文档以了解支持的错误处理选项的详细信息。

但是请记住,反应式函数不符合消息处理程序的条件,因为它不处理单个消息,而是提供了一种将框架提供的流(即 Flux)与用户提供的流连接起来的方法。为什么这很重要?这是因为您在本节后面阅读的关于 Retry Template、丢弃失败消息、重试、DLQ 和协助所有这些操作的配置属性的内容适用于消息处理程序(即命令式函数)。

Reactive API 提供了一个非常丰富的自身运算符和机制库,以帮助您处理特定于各种反应式用例的错误,这些用例比简单消息处理程序用例复杂得多,因此请使用它们,例如public final Flux<T> retryWhen(Retry retrySpec);,您可以在reactor.core.publisher.Flux中找到它。

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
	return flux -> flux
			.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
			.map(v -> v.toUpperCase());
}

丢弃失败的消息

默认情况下,系统提供错误处理程序。第一个错误处理程序将简单地记录错误消息。第二个错误处理程序是特定于绑定器的错误处理程序,负责在特定消息系统(例如,发送到 DLQ)的上下文中处理错误消息。但是,由于没有提供额外的错误处理配置(在此当前场景中),因此此处理程序将不执行任何操作。因此,本质上,在记录后,消息将被丢弃。

虽然在某些情况下是可以接受的,但在大多数情况下,它是不可以接受的,我们需要一些恢复机制来避免消息丢失。

处理错误消息

在上一节中,我们提到默认情况下导致错误的消息实际上会被记录并丢弃。框架还公开了用于提供自定义错误处理程序(即发送通知或写入数据库等)的机制。您可以通过添加专门设计用于接受ErrorMessageConsumer来做到这一点,除了所有关于错误的信息(例如,堆栈跟踪等)之外,它还包含原始消息(触发错误的消息)。注意:自定义错误处理程序与框架提供的错误处理程序(即日志记录和绑定器错误处理程序 - 请参阅上一节)互斥,以确保它们不会相互干扰。

@Bean
public Consumer<ErrorMessage> myErrorHandler() {
	return v -> {
		// send SMS notification code
	};
}

要将此类消费者识别为错误处理程序,您只需提供指向函数名称的error-handler-definition属性 - spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler

例如,对于绑定名称uppercase-in-0,该属性如下所示

spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler

如果您使用特殊的映射指令将绑定映射到更易读的名称 - spring.cloud.stream.function.bindings.uppercase-in-0=upper,则此属性如下所示

spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果您不小心将此类处理程序声明为Function,它仍然可以工作,但唯一的例外是不会对它的输出执行任何操作。但是,鉴于此类处理程序仍然依赖于 Spring Cloud Function 提供的功能,因此您也可以在处理程序具有一些复杂性并且您希望通过函数组合来解决这些复杂性(尽管不太可能)的情况下,利用函数组合的优势。

默认错误处理程序

如果您希望对所有函数 bean 使用单个错误处理程序,则可以使用定义默认属性的标准 spring-cloud-stream 机制spring.cloud.stream.default.error-handler-definition=myErrorHandler

DLQ - 死信队列

也许最常见的机制,DLQ 允许将失败的消息发送到一个特殊的目的地:死信队列

配置后,失败的消息将发送到此目的地以供后续重新处理或审核和协调。

考虑以下示例

@SpringBootApplication
public class SimpleStreamApplication {

	public static void main(String[] args) throws Exception {
		SpringApplication.run(SimpleStreamApplication.class,
		  "--spring.cloud.function.definition=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
		  "--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
		);
	}

	@Bean
	public Function<Person, Person> uppercase() {
		return personIn -> {
		   throw new RuntimeException("intentional");
	      });
		};
	}
}

提醒一下,在此示例中,属性的uppercase-in-0段对应于输入目标绑定的名称。consumer段表示它是一个消费者属性。

使用 DLQ 时,至少必须提供group属性才能正确命名 DLQ 目标。但是group通常与destination属性一起使用,如我们的示例所示。

除了某些标准属性外,我们还将auto-bind-dlq设置为指示绑定器为uppercase-in-0绑定创建和配置 DLQ 目标,该绑定对应于uppercase目标(请参阅相应的属性),这会导致一个名为uppercase.myGroup.dlq的其他 Rabbit 队列(请参阅 Kafka 文档以了解 Kafka 特定的 DLQ 属性)。

配置后,所有失败的消息都将路由到此目标,并保留原始消息以供进一步操作。

您可以看到错误消息包含与原始错误相关的更多信息,如下所示

. . . .
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload: blah

您还可以通过将max-attempts设置为“1”来促进立即调度到 DLQ(无需重试)。例如,

--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1

重试模板

在本节中,我们将介绍与重试功能配置相关的配置属性。

RetryTemplateSpring Retry库的一部分。虽然本文档不包含RetryTemplate的所有功能,但我们将提及与RetryTemplate特别相关的以下消费者属性

maxAttempts

处理消息的尝试次数。

默认值:3。

backOffInitialInterval

重试时的初始回退间隔。

默认值 1000 毫秒。

backOffMaxInterval

最大回退间隔。

默认值 10000 毫秒。

backOffMultiplier

回退倍数。

默认 2.0。

defaultRetryable

是否重试侦听器抛出的未在retryableExceptions中列出的异常。

默认:true

retryableExceptions

一个键为 Throwable 类名,值为布尔值的映射。指定将或不会重试的异常(及其子类)。另请参阅defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

默认:空。

虽然上述设置足以满足大多数自定义需求,但它们可能无法满足某些复杂的需求,此时您可能希望提供自己的RetryTemplate实例。为此,请在您的应用程序配置中将其配置为 Bean。应用程序提供的实例将覆盖框架提供的实例。此外,为避免冲突,您必须将绑定程序要使用的RetryTemplate实例限定为@StreamRetryTemplate。例如,

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

如您从上面的示例中看到的,您无需使用@Bean对其进行注释,因为@StreamRetryTemplate是一个限定的@Bean

如果您需要对RetryTemplate进行更精确的控制,可以在ConsumerProperties中按名称指定 Bean,以将特定的重试 Bean 与每个绑定关联。

spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>

绑定器

Spring Cloud Stream 提供了一个绑定器抽象,用于连接到外部中间件的物理目标。本节提供了有关绑定器 SPI 背后的主要概念、其主要组件和特定于实现的详细信息。

生产者和消费者

下图显示了生产者和消费者的总体关系

producers consumers
图 5. 生产者和消费者

生产者是任何将消息发送到绑定目标的组件。绑定目标可以使用该代理的Binder实现绑定到外部消息代理。调用bindProducer()方法时,第一个参数是代理中目标的名称,第二个参数是生产者发送消息的本地目标的实例,第三个参数包含要在为该绑定目标创建的适配器中使用的属性(例如分区键表达式)。

消费者是任何从绑定目标接收消息的组件。与生产者一样,消费者可以绑定到外部消息代理。调用bindConsumer()方法时,第一个参数是目标名称,第二个参数提供消费者逻辑组的名称。给定目标的消费者绑定所表示的每个组都会收到生产者发送到该目标的每条消息的副本(即,它遵循正常的发布-订阅语义)。如果有多个消费者实例使用相同的组名绑定,则消息将在这些消费者实例之间进行负载均衡,以便生产者发送的每条消息仅由每个组中的单个消费者实例使用(即,它遵循正常的排队语义)。

绑定器 SPI

绑定器 SPI 由许多接口、开箱即用的实用程序类和发现策略组成,这些策略为连接到外部中间件提供了一种可插拔机制。

SPI 的关键点是Binder接口,它是一种将输入和输出连接到外部中间件的策略。以下清单显示了Binder接口的定义

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String bindingName, String group, T inboundBindTarget, C consumerProperties);

    Binding<T> bindProducer(String bindingName, T outboundBindTarget, P producerProperties);
}

该接口是参数化的,提供了许多扩展点

  • 输入和输出绑定目标。

  • 扩展的消费者和生产者属性,允许特定的绑定器实现添加可以在类型安全的方式下支持的补充属性。

典型的绑定器实现包括以下内容

  • 实现Binder接口的类;

  • 一个 Spring @Configuration 类,它创建类型为Binder的 Bean 以及中间件连接基础设施。

  • 在类路径上找到的META-INF/spring.binders文件,其中包含一个或多个绑定器定义,如下例所示

    kafka:\
    org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
如前所述,绑定器抽象也是框架的扩展点之一。因此,如果您在前面的列表中找不到合适的绑定器,则可以在 Spring Cloud Stream 之上实现您自己的绑定器。在如何从头开始创建 Spring Cloud Stream 绑定器文章中,社区成员详细记录了实现自定义绑定器所需的步骤集,并提供了一个示例。这些步骤也在实现自定义绑定器部分中突出显示。

绑定器检测

Spring Cloud Stream 依赖于绑定器 SPI 的实现来执行将用户代码连接(绑定)到消息代理的任务。每个绑定器实现通常连接到一种类型的消息传递系统。

类路径检测

默认情况下,Spring Cloud Stream 依赖于 Spring Boot 的自动配置来配置绑定过程。如果在类路径上找到单个绑定器实现,Spring Cloud Stream 会自动使用它。例如,一个旨在仅绑定到 RabbitMQ 的 Spring Cloud Stream 项目可以添加以下依赖项

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

有关其他绑定器依赖项的特定 Maven 坐标,请参阅该绑定器实现的文档。

类路径上的多个绑定器

当类路径上存在多个绑定器时,应用程序必须指示哪个绑定器将用于每个目标绑定。每个绑定器配置都包含一个META-INF/spring.binders文件,这是一个简单的属性文件,如下例所示

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

其他提供的绑定器实现(如 Kafka)也存在类似的文件,并且预计自定义绑定器实现也将提供它们。键表示绑定器实现的标识名称,而值是配置类的逗号分隔列表,每个配置类都包含一个且仅一个类型为org.springframework.cloud.stream.binder.Binder的 Bean 定义。

绑定器选择可以通过全局方式执行,使用spring.cloud.stream.defaultBinder属性(例如,spring.cloud.stream.defaultBinder=rabbit),或者通过在每个绑定上配置绑定器来单独执行。例如,一个处理器应用程序(分别具有名为inputoutput的用于读取和写入的绑定)从 Kafka 读取并写入 RabbitMQ,可以指定以下配置

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

连接到多个系统

默认情况下,绑定器共享应用程序的 Spring Boot 自动配置,以便创建类路径上找到的每个绑定器的一个实例。如果您的应用程序应连接到多个相同类型的代理,则可以指定多个绑定器配置,每个配置具有不同的环境设置。

启用显式绑定器配置会完全禁用默认绑定器配置过程。如果这样做,则必须在配置中包含所有正在使用的绑定器。打算透明地使用 Spring Cloud Stream 的框架可能会创建可以按名称引用的绑定器配置,但它们不会影响默认绑定器配置。为此,绑定器配置可以将其defaultCandidate标志设置为 false(例如,spring.cloud.stream.binders.<configurationName>.defaultCandidate=false)。这表示独立于默认绑定器配置过程存在的配置。

以下示例显示了一个连接到两个 RabbitMQ 代理实例的处理器应用程序的典型配置

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: thing1
          binder: rabbit1
        output:
          destination: thing2
          binder: rabbit2
      binders:
        rabbit1:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host1>
        rabbit2:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host2>
特定绑定器的environment属性也可用于任何 Spring Boot 属性,包括此spring.main.sources,这对于为特定绑定器添加其他配置(例如覆盖自动配置的 Bean)很有用。

例如;

environment:
    spring:
        main:
           sources: com.acme.config.MyCustomBinderConfiguration

要为特定绑定器环境激活特定配置文件,应使用spring.profiles.active属性

environment:
    spring:
        profiles:
           active: myBinderProfile

在多绑定器应用程序中自定义绑定器

当应用程序中有多个绑定器并希望自定义这些绑定器时,可以通过提供BinderCustomizer实现来实现。对于具有单个绑定器的应用程序,此特殊自定义程序不是必需的,因为绑定器上下文可以直接访问自定义 Bean。但是,在多绑定器方案中情况并非如此,因为各种绑定器存在于不同的应用程序上下文中。通过提供BinderCustomizer接口的实现,即使绑定器驻留在不同的应用程序上下文中,它们也将接收自定义。Spring Cloud Stream 确保在应用程序开始使用绑定器之前进行自定义。用户必须检查绑定器类型,然后应用必要的自定义。

以下是如何提供BinderCustomizer Bean 的示例。

@Bean
public BinderCustomizer binderCustomizer() {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setRebalanceListener(...);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}

请注意,当存在多个相同类型的绑定器实例时,可以使用绑定器名称来过滤自定义。

绑定可视化和控制

Spring Cloud Stream 支持通过执行器端点以及编程方式对绑定进行可视化和控制。

编程方式

从 3.1 版开始,我们公开了org.springframework.cloud.stream.binding.BindingsLifecycleController,它作为 Bean 注册,一旦注入,即可用于控制单个绑定的生命周期

例如,查看来自某个测试用例的片段。如您所见,我们从 Spring 应用程序上下文中检索BindingsLifecycleController并执行各个方法来控制echo-in-0绑定的生命周期..

BindingsLifecycleController bindingsController = context.getBean(BindingsLifecycleController.class);
Binding binding = bindingsController.queryState("echo-in-0");
assertThat(binding.isRunning()).isTrue();
bindingsController.changeState("echo-in-0", State.STOPPED);
//Alternative way of changing state. For convenience we expose start/stop and pause/resume operations.
//bindingsController.stop("echo-in-0")
assertThat(binding.isRunning()).isFalse();

执行器

由于执行器和 Web 是可选的,因此您必须首先添加一个 Web 依赖项,并手动添加执行器依赖项。以下示例显示了如何添加 Web 框架的依赖项

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

以下示例显示了如何添加 WebFlux 框架的依赖项

<dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

您可以按如下方式添加执行器依赖项

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
要在 Cloud Foundry 中运行 Spring Cloud Stream 2.0 应用程序,必须将spring-boot-starter-webspring-boot-starter-actuator添加到类路径。否则,由于健康检查失败,应用程序将无法启动。

您还必须通过设置以下属性来启用bindings执行器端点:--management.endpoints.web.exposure.include=bindings

一旦满足这些先决条件。应用程序启动时,您应该在日志中看到以下内容

: Mapped "{[/actuator/bindings/{name}],methods=[POST]. . .
: Mapped "{[/actuator/bindings],methods=[GET]. . .
: Mapped "{[/actuator/bindings/{name}],methods=[GET]. . .

要可视化当前绑定,请访问以下 URL:http://<host>:<port>/actuator/bindings

或者,要查看单个绑定,请访问以下类似的 URL 之一:http://<host>:<port>/actuator/bindings/<bindingName>;

您也可以通过向相同的 URL 发送请求并提供一个作为 JSON 格式的 state 参数来停止、启动、暂停和恢复单个绑定,如下例所示。

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"PAUSED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
curl -d '{"state":"RESUMED"}' -H "Content-Type: application/json" -X POST http://<host>:<port>/actuator/bindings/myBindingName
PAUSEDRESUMED 仅在相应的绑定器及其底层技术支持时才有效。否则,您会在日志中看到警告消息。目前,只有 Kafka 和 [Solace](https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#consumer-bindings-pauseresume) 绑定器支持 PAUSEDRESUMED 状态。

绑定器配置属性

自定义绑定器配置时可以使用以下属性。这些属性通过 org.springframework.cloud.stream.config.BinderProperties 公开。

它们必须以 spring.cloud.stream.binders.<configurationName> 为前缀。

type

绑定器类型。它通常引用类路径上找到的绑定器之一,特别是 META-INF/spring.binders 文件中的一个键。

默认情况下,它与配置名称具有相同的值。

inheritEnvironment

配置是否继承应用程序本身的环境。

默认:true

environment

一组属性的根,这些属性可用于自定义绑定器环境。当设置此属性时,创建绑定器的上下文不是应用程序上下文的子级。此设置允许绑定器组件和应用程序组件完全分离。

默认值:empty

defaultCandidate

绑定器配置是否可以被视为默认绑定器,或者只能在显式引用时使用。此设置允许添加绑定器配置而不会干扰默认处理。

默认:true

实现自定义绑定器

为了实现自定义 Binder,您只需要

  • 添加所需的依赖项

  • 提供一个 ProvisioningProvider 实现

  • 提供一个 MessageProducer 实现

  • 提供一个 MessageHandler 实现

  • 提供一个 Binder 实现

  • 创建绑定器配置

  • 在 META-INF/spring.binders 中定义您的绑定器

添加所需的依赖项

spring-cloud-stream 依赖项添加到您的项目中 (例如,对于 Maven)

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供一个 ProvisioningProvider 实现

ProvisioningProvider 负责消费者和生产者目的地的供应,并且需要将应用程序 .yml 或 .properties 文件中包含的逻辑目的地转换为物理目的地引用。

以下是一个 ProvisioningProvider 实现示例,它只是修剪通过输入/输出绑定配置提供的目的地。

public class FileMessageBinderProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {

    @Override
    public ProducerDestination provisionProducerDestination(
            final String name,
            final ProducerProperties properties) {

        return new FileMessageDestination(name);
    }

    @Override
    public ConsumerDestination provisionConsumerDestination(
            final String name,
            final String group,
            final ConsumerProperties properties) {

        return new FileMessageDestination(name);
    }

    private class FileMessageDestination implements ProducerDestination, ConsumerDestination {

        private final String destination;

        private FileMessageDestination(final String destination) {
            this.destination = destination;
        }

        @Override
        public String getName() {
            return destination.trim();
        }

        @Override
        public String getNameForPartition(int partition) {
            throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
        }

    }

}

提供一个 MessageProducer 实现

MessageProducer 负责使用事件并将其作为消息处理到配置为使用此类事件的客户端应用程序。

这是一个 MessageProducer 实现示例,它扩展了 MessageProducerSupport 抽象以轮询与修剪后的目标名称匹配并位于项目路径中的文件,同时还存档已读取的消息并丢弃后续的相同消息。

public class FileMessageProducer extends MessageProducerSupport {

    public static final String ARCHIVE = "archive.txt";
    private final ConsumerDestination destination;
    private String previousPayload;

    public FileMessageProducer(ConsumerDestination destination) {
        this.destination = destination;
    }

    @Override
    public void doStart() {
        receive();
    }

    private void receive() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

        executorService.scheduleWithFixedDelay(() -> {
            String payload = getPayload();

            if(payload != null) {
                Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
                archiveMessage(payload);
                sendMessage(receivedMessage);
            }

        }, 0, 50, MILLISECONDS);
    }

    private String getPayload() {
        try {
            List<String> allLines = Files.readAllLines(Paths.get(destination.getName()));
            String currentPayload = allLines.get(allLines.size() - 1);

            if(!currentPayload.equals(previousPayload)) {
                previousPayload = currentPayload;
                return currentPayload;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        return null;
    }

    private void archiveMessage(String payload) {
        try {
            Files.write(Paths.get(ARCHIVE), (payload + "\n").getBytes(), CREATE, APPEND);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
在实现自定义绑定器时,此步骤不是严格强制性的,因为您始终可以依靠使用现有的 MessageProducer 实现!

提供一个 MessageHandler 实现

MessageHandler 提供了生成事件所需的逻辑。

这是一个 MessageHandler 实现示例。

public class FileMessageHandler implements MessageHandler{

    @Override
    public void handleMessage(Message<?> message) throws MessagingException {
        //write message to file
    }

}
在实现自定义绑定器时,此步骤不是严格强制性的,因为您始终可以依靠使用现有的 MessageHandler 实现!

提供一个 Binder 实现

您现在可以提供您自己的 Binder 抽象实现。这可以通过以下方式轻松完成:

  • 扩展 AbstractMessageChannelBinder

  • 将您的 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的泛型参数

  • 覆盖 createProducerMessageHandlercreateConsumerEndpoint 方法

例如。

public class FileMessageBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, FileMessageBinderProvisioner> {

    public FileMessageBinder(
            String[] headersToEmbed,
            FileMessageBinderProvisioner provisioningProvider) {

        super(headersToEmbed, provisioningProvider);
    }

    @Override
    protected MessageHandler createProducerMessageHandler(
            final ProducerDestination destination,
            final ProducerProperties producerProperties,
            final MessageChannel errorChannel) throws Exception {

        return message -> {
            String fileName = destination.getName();
            String payload = new String((byte[])message.getPayload()) + "\n";

            try {
                Files.write(Paths.get(fileName), payload.getBytes(), CREATE, APPEND);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        };
    }

    @Override
    protected MessageProducer createConsumerEndpoint(
            final ConsumerDestination destination,
            final String group,
            final ConsumerProperties properties) throws Exception {

        return new FileMessageProducer(destination);
    }

}

创建绑定器配置

您必须创建一个 Spring 配置来初始化绑定器实现的 bean (以及您可能需要的其他所有 bean)

@Configuration
public class FileMessageBinderConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinderProvisioner fileMessageBinderProvisioner() {
        return new FileMessageBinderProvisioner();
    }

    @Bean
    @ConditionalOnMissingBean
    public FileMessageBinder fileMessageBinder(FileMessageBinderProvisioner fileMessageBinderProvisioner) {
        return new FileMessageBinder(null, fileMessageBinderProvisioner);
    }

}

在 META-INF/spring.binders 中定义您的绑定器

最后,您必须在类路径上的 META-INF/spring.binders 文件中定义您的绑定器,同时指定绑定器的名称和绑定器配置类的完全限定名称。

myFileBinder:\
com.example.springcloudstreamcustombinder.config.FileMessageBinderConfiguration

配置选项

Spring Cloud Stream 支持通用配置选项以及绑定和绑定器的配置。某些绑定器允许其他绑定属性支持特定于中间件的功能。

可以通过 Spring Boot 支持的任何机制为 Spring Cloud Stream 应用程序提供配置选项。这包括应用程序参数、环境变量以及 YAML 或 .properties 文件。

绑定服务属性

这些属性通过 org.springframework.cloud.stream.config.BindingServiceProperties 公开。

spring.cloud.stream.instanceCount

应用程序已部署实例的数量。必须在生产者端进行分区时设置。在使用 RabbitMQ 和 Kafka 时,如果 autoRebalanceEnabled=false,则必须在消费者端设置。

默认值:1

spring.cloud.stream.instanceIndex

应用程序的实例索引:从 0instanceCount - 1 的数字。用于 RabbitMQ 和 Kafka 的分区,如果 autoRebalanceEnabled=false。在 Cloud Foundry 中自动设置为匹配应用程序的实例索引。

spring.cloud.stream.dynamicDestinations

可以动态绑定的目的地列表(例如,在动态路由场景中)。如果设置,则只能绑定列出的目的地。

默认值:empty(允许绑定任何目的地)。

spring.cloud.stream.defaultBinder

如果配置了多个绑定器,则要使用的默认绑定器。请参阅 类路径上的多个绑定器

默认:空。

spring.cloud.stream.overrideCloudConnectors

此属性仅在激活 cloud 配置文件并且应用程序提供了 Spring Cloud Connectors 时适用。如果属性为 false(默认值),则绑定器会检测到合适的绑定服务(例如,在 Cloud Foundry 中为 RabbitMQ 绑定器绑定的 RabbitMQ 服务),并使用它来创建连接(通常通过 Spring Cloud Connectors)。当设置为 true 时,此属性指示绑定器完全忽略绑定服务并依赖于 Spring Boot 属性(例如,依赖于为 RabbitMQ 绑定器提供的环境中的 spring.rabbitmq.* 属性)。此属性的典型用法是在自定义环境中嵌套 连接到多个系统时

默认值:false

spring.cloud.stream.bindingRetryInterval

在重新尝试创建绑定时,例如,绑定器不支持延迟绑定并且代理(例如,Apache Kafka)已关闭时,两次重试之间的间隔(以秒为单位)。将其设置为零以将此类条件视为致命条件,从而阻止应用程序启动。

默认值:30

绑定属性

通过使用 spring.cloud.stream.bindings.<bindingName>.<property>=<value> 格式提供绑定属性。<bindingName> 表示正在配置的绑定的名称。

例如,对于以下函数

@Bean
public Function<String, String> uppercase() {
	return v -> v.toUpperCase();
}

有两个名为 uppercase-in-0 的输入绑定和两个名为 uppercase-out-0 的输出绑定。有关更多详细信息,请参阅 绑定和绑定名称

为了避免重复,Spring Cloud Stream 支持设置所有绑定的值,格式为 spring.cloud.stream.default.<property>=<value>spring.cloud.stream.default.<producer|consumer>.<property>=<value> 用于常见的绑定属性。

在避免扩展绑定属性的重复时,应使用此格式 - spring.cloud.stream.<binder-type>.default.<producer|consumer>.<property>=<value>

常用绑定属性

这些属性通过 org.springframework.cloud.stream.config.BindingProperties 公开。

以下绑定属性可用于输入和输出绑定,并且必须以 spring.cloud.stream.bindings.<bindingName>. 为前缀(例如,spring.cloud.stream.bindings.uppercase-in-0.destination=ticktock)。

可以通过使用 spring.cloud.stream.default 前缀设置默认值(例如 spring.cloud.stream.default.contentType=application/json)。

destination

绑定在绑定中间件上的目标目的地(例如,RabbitMQ 交换机或 Kafka 主题)。如果绑定表示消费者绑定(输入),则可以绑定到多个目的地,并且可以将目的地名称指定为逗号分隔的 String 值。如果没有,则改为使用实际的绑定名称。此属性的默认值不能被覆盖。

group

绑定的消费者组。仅适用于入站绑定。请参阅 消费者组

默认值:null(表示匿名消费者)。

contentType

此绑定的内容类型。请参阅 内容类型协商

默认值:application/json

binder

此绑定使用的绑定器。有关详细信息,请参阅 类路径上的多个绑定器

默认值:null(如果存在,则使用默认绑定器)。

消费者属性

这些属性通过 org.springframework.cloud.stream.binder.ConsumerProperties 公开。

以下绑定属性仅适用于输入绑定,并且必须以 spring.cloud.stream.bindings.<bindingName>.consumer. 为前缀(例如,spring.cloud.stream.bindings.input.consumer.concurrency=3)。

可以通过使用 spring.cloud.stream.default.consumer 前缀设置默认值(例如,spring.cloud.stream.default.consumer.headerMode=none)。

autoStartup

指示此消费者是否需要自动启动。

默认:true

concurrency

入站消费者的并发性。

默认值:1

partitioned

消费者是否从分区生产者接收数据。

默认值:false

headerMode

当设置为 none 时,禁用输入上的标头解析。仅对不支持本机消息标头的消息传递中间件并且需要标头嵌入时有效。当从非 Spring Cloud Stream 应用程序使用数据时,此选项在不支持本机标头时很有用。当设置为 headers 时,它使用中间件的本机标头机制。当设置为 embeddedHeaders 时,它将标头嵌入到消息有效负载中。

默认值:取决于绑定器实现。

maxAttempts

如果处理失败,则处理消息的尝试次数(包括第一次)。设置为 1 以禁用重试。

默认值:3

backOffInitialInterval

重试时的初始回退间隔。

默认值:1000

backOffMaxInterval

最大回退间隔。

默认值:10000

backOffMultiplier

回退倍数。

默认值:2.0

defaultRetryable

是否重试侦听器抛出的未在retryableExceptions中列出的异常。

默认:true

instanceCount

当设置为大于或等于零的值时,它允许自定义此消费者的实例数(如果不同于 spring.cloud.stream.instanceCount)。当设置为负值时,它默认为 spring.cloud.stream.instanceCount。有关更多信息,请参阅 实例索引和实例数

默认值:-1

instanceIndex

当设置为大于或等于零的值时,它允许自定义此消费者的实例索引(如果不同于 spring.cloud.stream.instanceIndex)。当设置为负值时,它默认为 spring.cloud.stream.instanceIndex。如果提供了 instanceIndexList,则忽略。有关更多信息,请参阅 实例索引和实例数

默认值:-1

instanceIndexList

与不支持本机分区的绑定器(如 RabbitMQ)一起使用;允许应用程序实例从多个分区中使用数据。

默认:空。

retryableExceptions

一个键为 Throwable 类名,值为布尔值的映射。指定将或不会重试的异常(及其子类)。另请参阅defaultRetriable。示例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

默认:空。

useNativeDecoding

当设置为true时,入站消息将由客户端库直接反序列化,客户端库必须进行相应配置(例如,设置适当的 Kafka 生产者值反序列化器)。使用此配置时,入站消息的解编将不基于绑定的contentType。当使用原生解码时,生产者有责任使用适当的编码器(例如,Kafka 生产者值序列化器)来序列化出站消息。此外,当使用原生编码和解码时,headerMode=embeddedHeaders属性将被忽略,并且标头不会嵌入到消息中。请参阅生产者属性useNativeEncoding

默认值:false

多路复用

当设置为 true 时,底层绑定器将本机地在同一输入绑定上多路复用目标。

默认值:false

高级消费者配置

要对消息驱动型消费者的底层消息监听器容器进行高级配置,请向应用程序上下文添加一个ListenerContainerCustomizer bean。它将在应用上述属性后被调用,可用于设置其他属性。类似地,对于轮询消费者,请添加一个MessageSourceCustomizer bean。

以下是 RabbitMQ 绑定器的示例

@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer> containerCustomizer() {
    return (container, dest, group) -> container.setAdviceChain(advice1, advice2);
}

@Bean
public MessageSourceCustomizer<AmqpMessageSource> sourceCustomizer() {
    return (source, dest, group) -> source.setPropertiesConverter(customPropertiesConverter);
}

生产者属性

这些属性通过org.springframework.cloud.stream.binder.ProducerProperties公开。

以下绑定属性仅适用于输出绑定,并且必须以spring.cloud.stream.bindings.<bindingName>.producer.为前缀(例如,spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id)。

可以使用前缀spring.cloud.stream.default.producer设置默认值(例如,spring.cloud.stream.default.producer.partitionKeyExpression=headers.id)。

autoStartup

指示此消费者是否需要自动启动。

默认:true

partitionKeyExpression

确定如何对出站数据进行分区的 SpEL 表达式。如果设置,则此绑定上的出站数据将被分区。partitionCount必须设置为大于 1 的值才能生效。请参阅分区支持

默认值:null。

partitionKeyExtractorName

实现PartitionKeyExtractorStrategy的 bean 的名称。用于提取用于计算分区 ID 的键(请参阅“partitionSelector*”)。与“partitionKeyExpression”互斥。

默认值:null。

partitionSelectorName

实现PartitionSelectorStrategy的 bean 的名称。用于根据分区键确定分区 ID(请参阅“partitionKeyExtractor*”)。与“partitionSelectorExpression”互斥。

默认值:null。

partitionSelectorExpression

用于自定义分区选择的 SpEL 表达式。如果两者都没有设置,则分区将被选择为hashCode(key) % partitionCount,其中key通过partitionKeyExpression计算得出。

默认值:null

partitionCount

如果启用了分区,则数据的目标分区数。如果生产者已分区,则必须设置为大于 1 的值。在 Kafka 中,它被解释为提示。使用此值和目标主题的分区计数中的较大值。

默认值:1

requiredGroups

生产者必须确保消息传递到的组的逗号分隔列表,即使这些组在生产者创建后启动(例如,通过在 RabbitMQ 中预创建持久队列)。

headerMode

当设置为none时,它将禁用输出上的标头嵌入。它仅对不原生支持消息标头并需要标头嵌入的消息中间件有效。当为非 Spring Cloud Stream 应用程序生成数据时,如果原生标头不受支持,此选项非常有用。当设置为headers时,它使用中间件的原生标头机制。当设置为embeddedHeaders时,它将标头嵌入到消息有效负载中。

默认值:取决于绑定器实现。

useNativeEncoding

当设置为true时,出站消息将由客户端库直接序列化,客户端库必须进行相应配置(例如,设置适当的 Kafka 生产者值序列化器)。使用此配置时,出站消息的编组将不基于绑定的contentType。当使用原生编码时,消费者有责任使用适当的解码器(例如,Kafka 消费者值反序列化器)来反序列化入站消息。此外,当使用原生编码和解码时,headerMode=embeddedHeaders属性将被忽略,并且标头不会嵌入到消息中。请参阅消费者属性useNativeDecoding

默认值:false

errorChannelEnabled

当设置为 true 时,如果绑定器支持异步发送结果,则发送失败将发送到目标的错误通道。有关更多信息,请参阅错误处理。

默认值:false。

高级生产者配置

在某些情况下,生产者属性不足以正确配置绑定器中的生产消息处理程序,或者您可能更喜欢在配置此类生产消息处理程序时使用编程方式。无论出于何种原因,spring-cloud-stream 都提供ProducerMessageHandlerCustomizer来实现它。

@FunctionalInterface
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {

	/**
	 * Configure a {@link MessageHandler} that is being created by the binder for the
	 * provided destination name.
	 * @param handler the {@link MessageHandler} from the binder.
	 * @param destinationName the bound destination name.
	 */
	void configure(H handler, String destinationName);

}

如您所见,它使您可以访问正在生产的MessageHandler的实际实例,您可以根据需要对其进行配置。您需要做的就是提供此策略的实现并将其配置为@Bean

内容类型协商

数据转换是任何消息驱动型微服务架构的核心功能之一。鉴于此,在 Spring Cloud Stream 中,此类数据表示为 Spring Message,消息可能必须在到达其目标之前转换为所需的形状或大小。这是由于两个原因造成的

  1. 将传入消息的内容转换为与应用程序提供的处理程序的签名匹配。

  2. 将传出消息的内容转换为线格式。

线格式通常为byte[](这对 Kafka 和 Rabbit 绑定器适用),但受绑定器实现控制。

在 Spring Cloud Stream 中,消息转换是使用org.springframework.messaging.converter.MessageConverter完成的。

作为对以下详细信息的补充,您可能还想阅读以下博文

机制

为了更好地理解内容类型协商背后的机制和必要性,我们以一个非常简单的用例为例,使用以下消息处理程序作为示例

public Function<Person, String> personFunction {..}
为简单起见,我们假设这是应用程序中唯一的处理程序函数(我们假设没有内部管道)。

前面示例中显示的处理程序期望Person对象作为参数,并生成String类型作为输出。为了使框架成功地将传入的Message作为参数传递给此处理程序,它必须以某种方式将Message类型的有效负载从线格式转换为Person类型。换句话说,框架必须找到并应用适当的MessageConverter。为此,框架需要用户提供一些说明。其中一条说明已由处理程序方法本身的签名(Person类型)提供。因此,理论上,这应该足够(并且在某些情况下确实足够)。但是,对于大多数用例,为了选择合适的MessageConverter,框架需要其他信息。缺少的部分是contentType

Spring Cloud Stream 提供三种机制来定义contentType(按优先级顺序)

  1. HEADER:可以通过消息本身传递contentType。通过提供contentType标头,您可以声明要用于查找和应用适当的MessageConverter的内容类型。

  2. BINDING:可以通过设置spring.cloud.stream.bindings.input.content-type属性来为每个目标绑定设置contentType

    属性名称中的input段对应于目标的实际名称(在本例中为“input”)。此方法允许您在每个绑定的基础上声明要用于查找和应用适当的MessageConverter的内容类型。
  3. DEFAULT:如果Message标头或绑定中不存在contentType,则使用默认的application/json内容类型来查找和应用适当的MessageConverter

如前所述,前面的列表还演示了在发生平局时的优先级顺序。例如,标头提供的 content type 优先于任何其他 content type。对于在每个绑定的基础上设置的 content type 也是如此,这实际上允许您覆盖默认的 content type。但是,它也提供了一个合理的默认值(该值是根据社区反馈确定的)。

使application/json成为默认值的另一个原因是分布式微服务架构驱动的互操作性要求,其中生产者和消费者不仅运行在不同的 JVM 中,还可以运行在不同的非 JVM 平台上。

当非 void 处理程序方法返回时,如果返回值已经是Message,则该Message成为有效负载。但是,当返回值不是Message时,将使用返回值作为有效负载构建新的Message,同时继承输入Message的标头,减去由SpringIntegrationProperties.messageHandlerNotPropagatedHeaders定义或过滤的标头。默认情况下,那里只有一个标头设置:contentType。这意味着新的Message没有设置contentType标头,从而确保contentType可以发展。您始终可以选择不从处理程序方法返回Message,您可以在其中注入任何您希望的标头。

如果有内部管道,则Message将通过相同的转换过程发送到下一个处理程序。但是,如果没有内部管道或您已到达其末尾,则Message将发送回输出目标。

内容类型与参数类型

如前所述,为了使框架选择合适的MessageConverter,它需要参数类型以及可选的内容类型信息。选择合适的MessageConverter的逻辑位于参数解析器(HandlerMethodArgumentResolvers)中,这些解析器在调用用户定义的处理程序方法之前触发(即框架知道实际参数类型的时间)。如果参数类型与当前有效负载的类型不匹配,则框架将委托给预配置的MessageConverters堆栈,以查看其中任何一个是否可以转换有效负载。如您所见,MessageConverterObject fromMessage(Message<?> message, Class<?> targetClass);操作将targetClass作为其参数之一。框架还确保提供的Message始终包含contentType标头。当不存在 contentType 标头时,它会注入每个绑定的contentType标头或默认的contentType标头。contentType参数类型的组合是框架确定消息是否可以转换为目标类型的机制。如果找不到合适的MessageConverter,则会抛出异常,您可以通过添加自定义MessageConverter来处理此异常(请参阅用户定义的消息转换器)。

但如果有效负载类型与处理程序方法声明的目标类型匹配呢?在这种情况下,无需进行任何转换,有效负载将直接传递。虽然这听起来非常简单和合乎逻辑,但请记住那些使用Message<?>Object作为参数的处理程序方法。通过将目标类型声明为Object(在Java中,它是所有类的instanceof),您实际上放弃了转换过程。

不要期望仅仅根据contentType就能将Message转换为其他类型。请记住,contentType与目标类型是互补的。如果您愿意,可以提供一个提示,MessageConverter可能会或可能不会考虑该提示。

消息转换器

MessageConverters定义了两种方法

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

了解这些方法的契约及其用法非常重要,尤其是在Spring Cloud Stream的上下文中。

fromMessage方法将传入的Message转换为参数类型。Message的有效负载可以是任何类型,并且由MessageConverter的实际实现来支持多种类型。例如,某些JSON转换器可能支持有效负载类型为byte[]String等。当应用程序包含内部管道(即,输入→处理程序1→处理程序2→…→输出)并且上游处理程序的输出导致可能不是初始线格式的Message时,这一点很重要。

但是,toMessage方法具有更严格的契约,并且必须始终将Message转换为线格式:byte[]

因此,出于所有意图和目的(尤其是在实现您自己的转换器时),您将这两种方法视为具有以下签名

Object fromMessage(Message<?> message, Class<?> targetClass);

Message<byte[]> toMessage(Object payload, @Nullable MessageHeaders headers);

提供的消息转换器

如前所述,框架已经提供了一组MessageConverters来处理大多数常见用例。以下列表按优先级顺序描述了提供的MessageConverters(使用第一个可用的MessageConverter

  1. JsonMessageConverter:顾名思义,它支持在contentTypeapplication/json(默认)的情况下将Message的有效负载转换为POJO或从POJO转换。

  2. ByteArrayMessageConverter:在contentTypeapplication/octet-stream的情况下,支持将Message的有效负载从byte[]转换为byte[]。它本质上是一个直通转换器,主要用于向后兼容。

  3. ObjectStringMessageConverter:在contentTypetext/plain的情况下,支持将任何类型转换为String。它调用对象的toString()方法,或者如果有效负载是byte[],则调用new String(byte[])

如果找不到合适的转换器,框架将抛出异常。发生这种情况时,您应该检查您的代码和配置,并确保您没有遗漏任何内容(即,确保您通过使用绑定或标头提供了contentType)。但是,最有可能的是,您发现了一些不常见的案例(例如自定义contentType),并且当前提供的MessageConverters堆栈不知道如何转换。如果是这种情况,您可以添加自定义MessageConverter。请参阅用户定义的消息转换器

用户定义的消息转换器

Spring Cloud Stream公开了定义和注册其他MessageConverter的机制。要使用它,请实现org.springframework.messaging.converter.MessageConverter,并将其配置为@Bean。然后将其追加到现有的MessageConverter堆栈中。

重要的是要了解,自定义MessageConverter实现会添加到现有堆栈的头部。因此,自定义MessageConverter实现优先于现有的实现,这使您可以覆盖以及添加到现有的转换器中。

以下示例显示了如何创建一个消息转换器bean以支持名为application/bar的新内容类型

@SpringBootApplication
public static class SinkApplication {

    ...

    @Bean
    public MessageConverter customMessageConverter() {
        return new MyCustomMessageConverter();
    }
}

public class MyCustomMessageConverter extends AbstractMessageConverter {

    public MyCustomMessageConverter() {
        super(new MimeType("application", "bar"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (Bar.class.equals(clazz));
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        return (payload instanceof Bar ? payload : new Bar((byte[]) payload));
    }
}

应用程序间通信

Spring Cloud Stream支持应用程序之间的通信。应用程序间通信是一个复杂的问题,涉及多个方面,如下面的主题所述

连接多个应用程序实例

虽然Spring Cloud Stream使单个Spring Boot应用程序可以轻松连接到消息系统,但Spring Cloud Stream的典型场景是创建多应用程序管道,其中微服务应用程序彼此发送数据。您可以通过关联“相邻”应用程序的输入和输出目标来实现此场景。

假设设计要求Time Source应用程序将数据发送到Log Sink应用程序。您可以使用名为ticktock的公共目标来绑定这两个应用程序中的绑定。

Time Source(其绑定名为output)将设置以下属性

spring.cloud.stream.bindings.output.destination=ticktock

Log Sink(其绑定名为input)将设置以下属性

spring.cloud.stream.bindings.input.destination=ticktock

实例索引和实例计数

在扩展Spring Cloud Stream应用程序时,每个实例都可以接收有关同一应用程序的其他实例数量及其自身实例索引的信息。Spring Cloud Stream通过spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex属性来实现这一点。例如,如果HDFS接收器应用程序有三个实例,则所有三个实例的spring.cloud.stream.instanceCount都设置为3,而各个应用程序的spring.cloud.stream.instanceIndex分别设置为012

当通过Spring Cloud Data Flow部署Spring Cloud Stream应用程序时,这些属性会自动配置;当独立启动Spring Cloud Stream应用程序时,必须正确设置这些属性。默认情况下,spring.cloud.stream.instanceCount1spring.cloud.stream.instanceIndex0

在扩展的场景中,正确配置这两个属性对于解决整体上的分区行为(见下文)非常重要,并且某些绑定器(例如Kafka绑定器)始终需要这两个属性,以确保数据在多个消费者实例之间正确拆分。

分区

Spring Cloud Stream中的分区包括两项任务

配置输出绑定以进行分区

您可以通过设置其partitionKeyExpressionpartitionKeyExtractorName属性之一以及其partitionCount属性来配置输出绑定以发送分区数据。

例如,以下是有效的典型配置

spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id
spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5

根据该示例配置,使用以下逻辑将数据发送到目标分区。

基于partitionKeyExpression为发送到分区输出绑定的每个消息计算分区键的值。partitionKeyExpression是一个SpEL表达式,它针对出站消息进行评估(在前面的示例中,它是消息标头中id的值)以提取分区键。

如果SpEL表达式不足以满足您的需求,则可以通过提供org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的实现并将其配置为bean(使用@Bean注解)来计算分区键值。如果在应用程序上下文中有多个类型为org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的bean可用,则可以通过使用partitionKeyExtractorName属性指定其名称来进一步筛选它,如下例所示

--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
    return new CustomPartitionKeyExtractorClass();
}
在早期版本的Spring Cloud Stream中,您可以通过设置spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass属性来指定org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy的实现。从3.0版本开始,此属性已移除。

计算消息键后,分区选择过程将目标分区确定为0partitionCount - 1之间的值。适用于大多数场景的默认计算基于以下公式:key.hashCode() % partitionCount。可以通过设置要针对“key”评估的SpEL表达式(通过partitionSelectorExpression属性)或通过将org.springframework.cloud.stream.binder.PartitionSelectorStrategy的实现配置为bean(使用@Bean注解)在绑定上自定义此公式。与PartitionKeyExtractorStrategy类似,当应用程序上下文中有多个此类型的bean可用时,您可以通过使用spring.cloud.stream.bindings.output.producer.partitionSelectorName属性进一步筛选它,如下例所示

--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
    return new CustomPartitionSelectorClass();
}
在早期版本的Spring Cloud Stream中,您可以通过设置spring.cloud.stream.bindings.output.producer.partitionSelectorClass属性来指定org.springframework.cloud.stream.binder.PartitionSelectorStrategy的实现。从3.0版本开始,此属性已移除。

配置输入绑定以进行分区

输入绑定(绑定名称为uppercase-in-0)通过设置其partitioned属性以及应用程序本身的instanceIndexinstanceCount属性来配置以接收分区数据,如下例所示

spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5

instanceCount 值表示应在其中对数据进行分区的应用程序实例的总数。instanceIndex 必须是在多个实例中唯一的,其值介于 0instanceCount - 1 之间。实例索引帮助每个应用程序实例识别它从中接收数据的唯一分区。它是由使用不支持本机分区的技术的绑定器所需的。例如,对于 RabbitMQ,每个分区都有一个队列,队列名称包含实例索引。对于 Kafka,如果 autoRebalanceEnabledtrue(默认值),则 Kafka 会负责跨实例分发分区,并且不需要这些属性。如果 autoRebalanceEnabled 设置为 false,则绑定器将使用 instanceCountinstanceIndex 来确定实例订阅哪个分区(您必须至少拥有与实例数量一样多的分区)。绑定器分配分区而不是 Kafka。如果您希望特定分区的消息始终发送到同一实例,这可能很有用。当绑定器配置需要它们时,正确设置这两个值非常重要,以确保所有数据都被消费,并且应用程序实例接收互斥的数据集。

虽然在独立情况下,使用多个实例进行分区数据处理的方案可能很复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值,并让您依靠运行时基础设施提供有关实例索引和实例数量的信息,从而大大简化此过程。

测试

Spring Cloud Stream 提供了在不连接消息系统的情况下测试微服务应用程序的支持。

Spring Integration 测试绑定器

Spring Cloud Stream 带有一个测试绑定器,您可以使用它来测试各种应用程序组件,而无需实际的真实绑定器实现或消息代理。

此测试绑定器充当单元测试和集成测试之间的桥梁,并且基于 Spring Integration 框架作为 JVM 内的消息代理,本质上为您提供了两全其美的方法 - 一个真实的绑定器,无需网络。

测试绑定器配置

要启用 Spring Integration 测试绑定器,您只需将其添加为依赖项即可。

添加所需的依赖项

以下是所需 Maven POM 条目的示例。

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-stream-test-binder</artifactId>
	<scope>test</scope>
</dependency>

或用于 build.gradle.kts

testImplementation("org.springframework.cloud:spring-cloud-stream-test-binder")

测试绑定器用法

现在您可以将微服务作为简单的单元测试进行测试

@SpringBootTest
public class SampleStreamTests {

	@Autowired
	private InputDestination input;

	@Autowired
	private OutputDestination output;

	@Test
	public void testEmptyConfiguration() {
		this.input.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(output.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}

	@SpringBootApplication
	@Import(TestChannelBinderConfiguration.class)
	public static class SampleConfiguration {
		@Bean
		public Function<String, String> uppercase() {
			return v -> v.toUpperCase();
		}
	}
}

如果您需要更多控制或希望在同一测试套件中测试多个配置,您也可以执行以下操作

@EnableAutoConfiguration
public static class MyTestConfiguration {
	@Bean
	public Function<String, String> uppercase() {
			return v -> v.toUpperCase();
	}
}

. . .

@Test
public void sampleTest() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
				TestChannelBinderConfiguration.getCompleteConfiguration(
						MyTestConfiguration.class))
				.run("--spring.cloud.function.definition=uppercase")) {
		InputDestination source = context.getBean(InputDestination.class);
		OutputDestination target = context.getBean(OutputDestination.class);
		source.send(new GenericMessage<byte[]>("hello".getBytes()));
		assertThat(target.receive().getPayload()).isEqualTo("HELLO".getBytes());
	}
}

对于有多个绑定和/或多个输入和输出的情况,或者只是希望明确发送到或接收自的目标名称,InputDestinationOutputDestinationsend()receive() 方法被重写以允许您提供输入和输出目标的名称。

考虑以下示例。

@EnableAutoConfiguration
public static class SampleFunctionConfiguration {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

以及实际测试

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

对于具有其他映射属性(如 destination)的情况,您应该使用这些名称。例如,考虑前面测试的不同版本,其中我们将 uppercase 函数的输入和输出显式映射到 myInputmyOutput 绑定名称

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleFunctionConfiguration.class))
							.run(
							"--spring.cloud.function.definition=uppercase;reverse",
							"--spring.cloud.stream.bindings.uppercase-in-0.destination=myInput",
							"--spring.cloud.stream.bindings.uppercase-out-0.destination=myOutput"
							)) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "myInput");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "myOutput");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

测试绑定器和 PollableMessageSource

Spring Integration 测试绑定器还允许您在使用 PollableMessageSource 时编写测试(有关更多详细信息,请参阅 使用轮询使用者)。

不过,需要理解的重要一点是,轮询不是事件驱动的,PollableMessageSource 是一种策略,它公开操作以生成(轮询)消息(单数)。您轮询的频率、使用多少个线程或从哪里轮询(消息队列或文件系统)完全取决于您;换句话说,配置轮询器或线程或消息的实际来源是您的责任。幸运的是,Spring 有大量的抽象来精确配置这一点。

让我们来看一个示例

@Test
public void samplePollingTest() {
	ApplicationContext context = new SpringApplicationBuilder(SamplePolledConfiguration.class)
				.web(WebApplicationType.NONE)
				.run("--spring.jmx.enabled=false", "--spring.cloud.stream.pollable-source=myDestination");
	OutputDestination destination = context.getBean(OutputDestination.class);
	System.out.println("Message 1: " + new String(destination.receive().getPayload()));
	System.out.println("Message 2: " + new String(destination.receive().getPayload()));
	System.out.println("Message 3: " + new String(destination.receive().getPayload()));
}

@Import(TestChannelBinderConfiguration.class)
@EnableAutoConfiguration
public static class SamplePolledConfiguration {
	@Bean
	public ApplicationRunner poller(PollableMessageSource polledMessageSource, StreamBridge output, TaskExecutor taskScheduler) {
		return args -> {
			taskScheduler.execute(() -> {
				for (int i = 0; i < 3; i++) {
					try {
						if (!polledMessageSource.poll(m -> {
							String newPayload = ((String) m.getPayload()).toUpperCase();
							output.send("myOutput", newPayload);
						})) {
							Thread.sleep(2000);
						}
					}
					catch (Exception e) {
						// handle failure
					}
				}
			});
		};
	}
}

以上(非常基本的)示例将在 2 秒的间隔内生成 3 条消息,并将它们发送到 Source 的输出目标,此绑定器将其发送到 OutputDestination,我们从中检索它们(用于任何断言)。目前,它打印以下内容

Message 1: POLLED DATA
Message 2: POLLED DATA
Message 3: POLLED DATA

如您所见,数据相同。这是因为此绑定器定义了实际 MessageSource 的默认实现 - 使用 poll() 操作轮询消息的来源。虽然对于大多数测试场景来说已经足够了,但在某些情况下,您可能希望定义自己的 MessageSource。为此,只需在您的测试配置中配置类型为 MessageSource 的 bean,并提供您自己的消息来源实现。

以下是一个示例

@Bean
public MessageSource<?> source() {
	return () -> new GenericMessage<>("My Own Data " + UUID.randomUUID());
}

呈现以下输出;

Message 1: MY OWN DATA 1C180A91-E79F-494F-ABF4-BA3F993710DA
Message 2: MY OWN DATA D8F3A477-5547-41B4-9434-E69DA7616FEE
Message 3: MY OWN DATA 20BF2E64-7FF4-4CB6-A823-4053D30B5C74
不要将此 bean 命名为 messageSource,因为它将与 Spring Boot 为无关原因提供的同名(不同类型)的 bean 冲突。

混合使用测试绑定器和常规中间件绑定器进行测试的特殊说明

基于 Spring Integration 的测试绑定器用于在不涉及实际中间件绑定器(例如 Kafka 或 RabbitMQ 绑定器)的情况下测试应用程序。如上各节所述,测试绑定器通过依靠内存中的 Spring Integration 通道帮助您快速验证应用程序行为。当测试绑定器存在于测试类路径上时,Spring Cloud Stream 将尝试在需要绑定器进行通信的任何地方使用此绑定器进行所有测试目的。换句话说,您不能在同一个模块中混合使用测试绑定器和常规中间件绑定器进行测试。在使用测试绑定器测试应用程序后,如果您想继续使用实际的中间件绑定器进行进一步的集成测试,建议将使用实际绑定器的这些测试添加到单独的模块中,以便这些测试可以与实际中间件建立正确的连接,而不是依赖测试绑定器提供的内存中通道。

健康指标

Spring Cloud Stream 为绑定器提供了一个健康指标。它在名称 binders 下注册,可以通过设置 management.health.binders.enabled 属性来启用或禁用。

要启用健康检查,您首先需要通过包含其依赖项来启用“web”和“actuator”(请参阅 绑定可视化和控制)。

如果应用程序没有显式设置 management.health.binders.enabled,则 management.health.defaults.enabled 将匹配为 true,并且绑定器健康指标将被启用。如果您想完全禁用健康指标,则必须将 management.health.binders.enabled 设置为 false

您可以使用 Spring Boot Actuator 健康端点访问健康指标 - /actuator/health。默认情况下,当您访问上述端点时,您只会收到顶级应用程序状态。为了从绑定器特定的健康指标接收完整详细信息,您需要在应用程序中包含属性 management.endpoint.health.show-details 并将其值设置为 ALWAYS

健康指标是特定于绑定器的,某些绑定器实现不一定提供健康指标。

如果您想完全禁用所有开箱即用的健康指标,并改用您自己的健康指标,您可以通过将属性 management.health.binders.enabled 设置为 false,然后在您的应用程序中提供您自己的 HealthIndicator bean 来实现。在这种情况下,Spring Boot 的健康指标基础设施仍将获取这些自定义 bean。即使您没有禁用绑定器健康指标,您仍然可以通过除了开箱即用的健康检查之外,提供您自己的 HealthIndicator bean 来增强健康检查。

当您在同一应用程序中有多个绑定器时,健康指标默认情况下处于启用状态,除非应用程序通过将 management.health.binders.enabled 设置为 false 来将其关闭。在这种情况下,如果用户想要禁用一部分绑定器的健康检查,则应通过在多绑定器配置的环境中将 management.health.binders.enabled 设置为 false 来完成。有关如何提供特定于环境的属性的详细信息,请参阅 连接到多个系统

如果类路径中存在多个绑定器,但并非所有绑定器都在应用程序中使用,这可能会在健康指标的上下文中导致一些问题。健康检查执行方式可能存在特定于实现的详细信息。例如,如果绑定器没有注册任何目标,则 Kafka 绑定器可能会将状态确定为 DOWN

让我们来看一个具体的情况。假设您在类路径中同时拥有 Kafka 和 Kafka Streams 绑定器,但仅在应用程序代码中使用 Kafka Streams 绑定器,即仅使用 Kafka Streams 绑定器提供绑定。由于 Kafka 绑定器未使用,并且它有特定的检查来查看是否注册了任何目标,因此绑定器健康检查将失败。顶级应用程序健康检查状态将报告为 DOWN。在这种情况下,您可以简单地从应用程序中删除 kafka 绑定器的依赖项,因为您没有使用它。

示例

有关 Spring Cloud Stream 示例,请参阅 GitHub 上的 spring-cloud-stream-samples 存储库。

在 CloudFoundry 上部署流应用程序

在 CloudFoundry 上,服务通常通过一个称为 VCAP_SERVICES 的特殊环境变量公开。

配置绑定器连接时,您可以使用环境变量中的值,如 dataflow Cloud Foundry 服务器 文档中所述。

绑定器实现

以下是可用 Binder 实现的列表

如前所述,绑定器抽象也是框架的扩展点之一。因此,如果您在前面的列表中找不到合适的绑定器,则可以在 Spring Cloud Stream 之上实现您自己的绑定器。在如何从头开始创建 Spring Cloud Stream 绑定器文章中,社区成员详细记录了实现自定义绑定器所需的步骤集,并提供了一个示例。这些步骤也在实现自定义绑定器部分中突出显示。