Spring 数据集成之旅简史

Spring 在数据集成方面的旅程始于 Spring Integration。凭借其编程模型,它为构建应用程序提供了一致的开发体验,这些应用程序可以采用 企业集成模式 来连接外部系统,例如数据库、消息代理等等。

快进到云时代,微服务在企业环境中变得越来越突出。 Spring Boot 改变了开发人员构建应用程序的方式。凭借 Spring 的编程模型和 Spring Boot 处理的运行时职责,开发独立的、生产级的基于 Spring 的微服务变得无缝衔接。

为了将此扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被整合到一个新的项目中。Spring Cloud Stream 诞生了。

使用 Spring Cloud Stream,开发人员可以

  • 独立构建、测试和部署以数据为中心的应用程序。

  • 应用现代微服务架构模式,包括通过消息进行组合。

  • 使用以事件为中心的思维方式解耦应用程序职责。事件可以代表某个时间发生的事件,下游消费者应用程序可以对此做出反应,而无需了解其来源或生产者的身份。

  • 将业务逻辑移植到消息代理(例如 RabbitMQ、Apache Kafka、Amazon Kinesis)。

  • 依赖框架对常见用例的自动内容类型支持。可以扩展到不同的数据转换类型。

  • 等等...

快速入门

您可以在不到 5 分钟的时间内尝试 Spring Cloud Stream,甚至在深入了解任何细节之前,都可以按照以下三步指南进行操作。

我们将向您展示如何创建一个 Spring Cloud Stream 应用程序,该应用程序接收来自您选择的訊息中间件的訊息(稍后将详细介绍),并将接收到的訊息记录到控制台。我们称之为 `LoggingConsumer`。虽然它并不实用,但它提供了一些主要概念和抽象的良好介绍,使您更容易理解本用户指南的其余部分。

这三个步骤如下

使用 Spring Initializr 创建示例应用程序

要开始,请访问 Spring Initializr。从那里,您可以生成我们的 `LoggingConsumer` 应用程序。为此

  1. 在 **依赖项** 部分,开始键入 `stream`。当“Cloud Stream”选项出现时,选择它。

  2. 开始键入 'kafka' 或 'rabbit'。

  3. 选择“Kafka”或“RabbitMQ”。

    基本上,您选择应用程序绑定的訊息中间件。我们建议使用您已经安装的或您更愿意安装和运行的中间件。此外,正如您从 Initilaizer 屏幕中看到的那样,您可以选择其他几个选项。例如,您可以选择 Gradle 作为您的构建工具,而不是 Maven(默认值)。

  4. 在 **工件** 字段中,键入 'logging-consumer'。

    **工件** 字段的值将成为应用程序名称。如果您为中间件选择了 RabbitMQ,那么您的 Spring Initializr 现在应该如下所示

spring initializr
  1. 单击 **生成项目** 按钮。

    这样做会将生成的项目的压缩版本下载到您的硬盘驱动器。

  2. 将文件解压缩到您要用作项目目录的文件夹中。

我们鼓励您探索 Spring Initializr 中提供的许多可能性。它允许您创建许多不同类型的 Spring 应用程序。

将项目导入您的 IDE

现在您可以将项目导入您的 IDE。请记住,根据 IDE 的不同,您可能需要遵循特定的导入过程。例如,根据项目的生成方式(Maven 或 Gradle),您可能需要遵循特定的导入过程(例如,在 Eclipse 或 STS 中,您需要使用文件 → 导入 → Maven → 现有 Maven 项目)。

导入后,项目必须没有任何错误。此外,src/main/java 应该包含 com.example.loggingconsumer.LoggingConsumerApplication

从技术上讲,此时,您可以运行应用程序的主类。它已经是有效的 Spring Boot 应用程序。但是,它什么也不做,所以我们想添加一些代码。

添加消息处理程序、构建和运行

修改 com.example.loggingconsumer.LoggingConsumerApplication 类,使其看起来如下

@SpringBootApplication
public class LoggingConsumerApplication {

	public static void main(String[] args) {
		SpringApplication.run(LoggingConsumerApplication.class, args);
	}

	@Bean
	public Consumer<Person> log() {
	    return person -> {
	        System.out.println("Received: " + person);
	    };
	}

	public static class Person {
		private String name;
		public String getName() {
			return name;
		}
		public void setName(String name) {
			this.name = name;
		}
		public String toString() {
			return this.name;
		}
	}
}

如您从前面的列表中看到的

  • 我们使用函数式编程模型(参见 [Spring Cloud Function 支持])来定义一个名为 Consumer 的单个消息处理程序。

  • 我们依赖框架约定将此处理程序绑定到绑定器公开的输入目标绑定。

这样做还可以让您看到框架的核心功能之一:它尝试自动将传入的消息有效负载转换为 Person 类型。

现在您拥有一个完全功能的 Spring Cloud Stream 应用程序,它可以监听消息。为了简单起见,我们假设您在 第一步 中选择了 RabbitMQ。假设您已安装并运行 RabbitMQ,您可以通过在 IDE 中运行其 main 方法来启动应用程序。

您应该看到以下输出

	--- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg, bound to: input
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
	--- [ main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#2a3a299:0/SimpleConnection@66c83fc8. . .
	. . .
	--- [ main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.CbMIwdkJSBO1ZoPDOtHtCg
	. . .
	--- [ main] c.e.l.LoggingConsumerApplication         : Started LoggingConsumerApplication in 2.531 seconds (JVM running for 2.897)

转到 RabbitMQ 管理控制台或任何其他 RabbitMQ 客户端,并将消息发送到 input.anonymous.CbMIwdkJSBO1ZoPDOtHtCganonymous.CbMIwdkJSBO1ZoPDOtHtCg 部分表示组名,它是生成的,因此它在您的环境中必然不同。为了获得更可预测的结果,您可以通过设置 spring.cloud.stream.bindings.input.group=hello(或您喜欢的任何名称)来使用显式组名。

消息的内容应该是 Person 类的 JSON 表示形式,如下所示

{"name":"Sam Spade"}

然后,在您的控制台中,您应该看到

已接收:Sam Spade

您还可以将应用程序构建并打包到一个引导 jar 中(使用 ./mvnw clean install),并使用 java -jar 命令运行构建的 JAR。

现在您拥有一个可工作的(虽然非常基本)Spring Cloud Stream 应用程序。

Spring 表达式语言 (SpEL) 在流式数据上下文中的应用

在整个参考手册中,您将遇到许多可以使用 Spring 表达式语言 (SpEL) 的功能和示例。了解使用 SpEL 时的一些限制非常重要。

SpEL 使您可以访问当前消息以及您正在运行的应用程序上下文。但是,了解 SpEL 在传入消息的上下文中可以看到哪些类型的数据非常重要。从代理来看,消息以 byte[] 的形式到达。然后,它被绑定器转换为 Message<byte[]>,而您可以看到消息的有效负载保持其原始形式。消息的标头是 <String, Object>,其中值通常是另一个基本类型或基本类型的集合/数组,因此是 Object。这是因为绑定器不知道所需的输入类型,因为它无法访问用户代码(函数)。因此,绑定器实际上传递了一个包含有效负载和一些可读元数据的信封,这些元数据以消息标头的形式存在,就像邮件传递的信件一样。这意味着,虽然可以访问消息的有效负载,但您只能以原始数据(即 byte[])的形式访问它。虽然开发人员经常要求能够使用 SpEL 访问有效负载对象的字段作为具体类型(例如,Foo、Bar 等),但您可以看到实现这一点有多么困难甚至不可能。以下是一个示例来说明这个问题;假设您有一个路由表达式,根据有效负载类型路由到不同的函数。此要求将意味着将有效负载从 byte[] 转换为特定类型,然后应用 SpEL。但是,为了执行这种转换,我们需要知道要传递给转换器的实际类型,而该类型来自函数的签名,我们不知道哪个函数。解决此要求的更好方法是将类型信息作为消息标头传递(例如,application/json;type=foo.bar.Baz)。您将获得一个清晰可读的字符串值,可以在一年内访问和评估,并且易于阅读 SpEL 表达式。

此外,使用有效负载进行路由决策被认为是非常不好的做法,因为有效负载被认为是特权数据 - 只有最终接收者才能读取的数据。同样,使用邮件传递的类比,您不希望邮递员打开您的信封并阅读信件内容以做出一些递送决策。同样的概念也适用于此,尤其是在生成消息时相对容易包含此类信息的情况下。它强制执行与要通过网络传输的数据的设计相关的某种程度的纪律,以及哪些数据可以被视为公共数据,哪些数据是特权数据。