使用 R2DBC 访问数据

R2DBC(“Reactive Relational Database Connectivity”)是一个社区驱动的规范工作,旨在使用响应式模式标准化对 SQL 数据库的访问。

包层次结构

Spring 框架的 R2DBC 抽象框架包含两个不同的包

使用 R2DBC 核心类来控制基本的 R2DBC 处理和错误处理

本节介绍如何使用 R2DBC 核心类来控制基本的 R2DBC 处理,包括错误处理。它包括以下主题

使用 DatabaseClient

DatabaseClient 是 R2DBC 核心包中的核心类。它处理资源的创建和释放,这有助于避免常见错误,例如忘记关闭连接。它执行核心 R2DBC 工作流的基本任务(例如语句创建和执行),让应用程序代码提供 SQL 并提取结果。DatabaseClient

  • 运行 SQL 查询

  • 更新语句和存储过程调用

  • 执行 Result 实例的迭代

  • 捕获 R2DBC 异常并将它们转换为在 org.springframework.dao 包中定义的通用、信息量更大的异常层次结构。(请参阅 一致的异常层次结构。)

客户端具有一个功能性的、流畅的 API,使用响应式类型进行声明式组合。

当您在代码中使用 DatabaseClient 时,您只需要实现 java.util.function 接口,为它们提供明确定义的契约。给定由 DatabaseClient 类提供的 ConnectionFunction 回调会创建一个 Publisher。对于提取 Row 结果的映射函数也是如此。

您可以通过使用 ConnectionFactory 引用直接实例化,或在 Spring IoC 容器中配置它并将其作为 Bean 引用提供给 DAO,在 DAO 实现中使用 DatabaseClient

创建 DatabaseClient 对象的最简单方法是通过静态工厂方法,如下所示

  • Java

  • Kotlin

DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory 应始终在 Spring IoC 容器中配置为 Bean。

上述方法创建了一个具有默认设置的 DatabaseClient

您还可以从 DatabaseClient.builder() 获取 Builder 实例。您可以通过调用以下方法自定义客户端

  • ….bindMarkers(…):提供一个特定的 BindMarkersFactory 来配置命名参数到数据库绑定标记的转换。

  • ….executeFunction(…):设置 ExecuteFunction 如何运行 Statement 对象。

  • ….namedParameters(false):禁用命名参数扩展。默认情况下启用。

方言由 BindMarkersFactoryResolverConnectionFactory 解析,通常通过检查 ConnectionFactoryMetadata
您可以通过注册一个实现 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 的类来让 Spring 自动发现您的 BindMarkersFactory,方法是通过 META-INF/spring.factories 注册。BindMarkersFactoryResolver 使用 Spring 的 SpringFactoriesLoader 从类路径中发现绑定标记提供程序实现。

当前支持的数据库为

  • H2

  • MariaDB

  • Microsoft SQL Server

  • MySQL

  • Postgres

此类发出的所有 SQL 都在 DEBUG 级别下记录,该级别对应于客户端实例的完全限定类名(通常为 DefaultDatabaseClient)。此外,每次执行都会在响应式序列中注册一个检查点以帮助调试。

以下部分提供了一些 DatabaseClient 用法的示例。这些示例并非 DatabaseClient 公开的所有功能的详尽列表。有关详细信息,请参阅随附的 javadoc

执行语句

DatabaseClient 提供了运行语句的基本功能。以下示例显示了为创建新表的最小但功能齐全的代码需要包含的内容

  • Java

  • Kotlin

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .await()

DatabaseClient 旨在提供便捷、流畅的使用体验。它在执行规范的每个阶段都公开了中间、延续和终端方法。上面前面的示例使用 then() 返回一个完成 Publisher,该 Publisher 在查询(如果 SQL 查询包含多个语句,则为查询)完成后立即完成。

execute(…) 接受 SQL 查询字符串或查询 Supplier<String>,以将实际查询创建推迟到执行时。

查询 (SELECT)

SQL 查询可以通过 Row 对象或受影响的行数返回结果。DatabaseClient 可以返回更新的行数或行本身,具体取决于发出的查询。

以下查询从表中获取 idname

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person")
        .fetch().awaitSingle()

以下查询使用绑定变量

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitSingle()

您可能已经注意到上面示例中使用了 fetch()fetch() 是一个延续操作符,允许您指定要使用多少数据。

调用 first() 返回结果中的第一行并丢弃剩余的行。您可以使用以下操作符使用数据

  • first() 返回整个结果的第一行。它的 Kotlin 协程变体对于非空返回类型命名为 awaitSingle(),如果值为可选类型,则命名为 awaitSingleOrNull()

  • one() 返回恰好一个结果,如果结果包含多行则失败。使用 Kotlin 协程,awaitOne() 表示恰好一个值,awaitOneOrNull() 表示值可能是 null

  • all() 返回结果的所有行。使用 Kotlin 协程时,使用 flow()

  • rowsUpdated() 返回受影响的行数 (INSERT/UPDATE/DELETE 计数)。它的 Kotlin 协程变体命名为 awaitRowsUpdated()

在不指定进一步的映射细节的情况下,查询将表格结果作为 Map 返回,其键是不区分大小写的列名,映射到它们的列值。

您可以通过提供一个 Function<Row, T> 来控制结果映射,该函数将为每个 Row 调用,以便它可以返回任意值(单个值、集合和映射以及对象)。

以下示例提取 name 列并发出其值

  • Java

  • Kotlin

Flux<String> names = client.sql("SELECT name FROM person")
        .map(row -> row.get("name", String.class))
        .all();
val names = client.sql("SELECT name FROM person")
        .map{ row: Row -> row.get("name", String.class) }
        .flow()

或者,有一个映射到单个值的快捷方式

	Flux<String> names = client.sql("SELECT name FROM person")
			.mapValue(String.class)
			.all();

或者您可以映射到具有 Bean 属性或记录组件的结果对象

	// assuming a name property on Person
	Flux<Person> persons = client.sql("SELECT name FROM person")
			.mapProperties(Person.class)
			.all();
null 怎么办?

关系数据库结果可能包含 null 值。Reactive Streams 规范禁止发出 null 值。此要求需要在提取器函数中进行正确的 null 处理。虽然您可以从 Row 获取 null 值,但您不能发出 null 值。您必须将任何 null 值包装在一个对象中(例如,对于单个值,使用 Optional),以确保您的提取器函数永远不会直接返回 null 值。

使用 DatabaseClient 更新 (INSERTUPDATEDELETE)

修改语句的唯一区别在于这些语句通常不返回表格数据,因此您使用 rowsUpdated() 使用结果。

以下示例显示了一个 UPDATE 语句,该语句返回更新的行数

  • Java

  • Kotlin

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitRowsUpdated()

将值绑定到查询

一个典型的应用程序需要参数化的 SQL 语句来根据某些输入选择或更新行。这些通常是受 WHERE 子句约束的 SELECT 语句,或者接受输入参数的 INSERTUPDATE 语句。如果参数没有正确转义,参数化语句会存在 SQL 注入的风险。DatabaseClient 利用 R2DBC 的 bind API 消除了查询参数的 SQL 注入风险。您可以使用 execute(…) 操作符提供参数化的 SQL 语句,并将参数绑定到实际的 Statement。然后,您的 R2DBC 驱动程序将使用预准备语句和参数替换来运行该语句。

参数绑定支持两种绑定策略

  • 按索引,使用基于零的参数索引。

  • 按名称,使用占位符名称。

以下示例显示了查询的参数绑定

    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bind("id", "joe")
	    	.bind("name", "Joe")
			.bind("age", 34);

或者,您可以传入名称和值的映射

	Map<String, Object> params = new LinkedHashMap<>();
	params.put("id", "joe");
	params.put("name", "Joe");
	params.put("age", 34);
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindValues(params);

或者您可以传入一个具有 Bean 属性或记录组件的参数对象

	// assuming id, name, age properties on Person
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindProperties(new Person("joe", "Joe", 34);

或者,您可以使用位置参数将值绑定到语句。索引从零开始。

    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bind(0, "joe")
	    	.bind(1, "Joe")
			.bind(2, 34);

如果您的应用程序绑定到许多参数,则可以使用单个调用实现相同的功能

    List<?> values = List.of("joe", "Joe", 34);
    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bindValues(values);
R2DBC 原生绑定标记

R2DBC 使用依赖于实际数据库供应商的数据库原生绑定标记。例如,Postgres 使用索引标记,例如 $1$2$n。另一个示例是 SQL Server,它使用以 @ 为前缀的命名绑定标记。

这与 JDBC 不同,JDBC 需要 ? 作为绑定标记。在 JDBC 中,实际的驱动程序将其语句执行的一部分中的 ? 绑定标记转换为数据库原生标记。

Spring Framework 的 R2DBC 支持允许您使用原生绑定标记或带 :name 语法的命名绑定标记。

命名参数支持利用 BindMarkersFactory 实例在查询执行时将命名参数扩展为原生绑定标记,这在一定程度上提供了跨各种数据库供应商的查询可移植性。

查询预处理器将命名 Collection 参数展开为一系列绑定标记,从而无需根据参数数量动态创建查询。嵌套的对象数组被展开以允许使用(例如)选择列表。

考虑以下查询

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))

前面的查询可以参数化并运行如下

  • Java

  • Kotlin

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann",  50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples)
选择列表的使用依赖于供应商。

以下示例显示了使用 IN 谓词的更简单的变体

  • Java

  • Kotlin

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", arrayOf(35, 50))
R2DBC 本身不支持类似集合的值。但是,在上面的示例中展开给定的 List 对 Spring 的 R2DBC 支持中的命名参数有效,例如,用于 IN 子句,如上所示。但是,插入或更新数组类型的列(例如,在 Postgres 中)需要底层 R2DBC 驱动程序支持的数组类型:通常是 Java 数组,例如,String[] 更新 text[] 列。不要将 Collection<String> 或类似内容作为数组参数传递。

语句过滤器

有时您需要在实际的 Statement 运行之前微调其选项。为此,使用 DatabaseClient 注册一个 Statement 过滤器 (StatementFilterFunction) 以拦截和修改其执行中的语句,如下例所示

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
	    .bind("name", …)
	    .bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
		.bind("name", …)
		.bind("state", …)

DatabaseClient 还公开了简化的 filter(…) 重载,它接受一个 Function<Statement, Statement>

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
	    .filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter { statement -> s.returnGeneratedValues("id") }

client.sql("SELECT id, name, state FROM table")
	    .filter { statement -> s.fetchSize(25) }

StatementFilterFunction 实现允许过滤 Statement 和过滤 Result 对象。

DatabaseClient 最佳实践

DatabaseClient 类的实例一旦配置,就是线程安全的。这一点很重要,因为这意味着您可以配置 DatabaseClient 的单个实例,然后安全地将此共享引用注入到多个 DAO(或存储库)中。DatabaseClient 是有状态的,因为它维护对 ConnectionFactory 的引用,但此状态不是会话状态。

使用 DatabaseClient 类时,一种常见的做法是在您的 Spring 配置文件中配置 ConnectionFactory,然后将该共享 ConnectionFactory bean 依赖注入到您的 DAO 类中。DatabaseClientConnectionFactory 的 setter 中创建。这导致 DAO 类似于以下内容

  • Java

  • Kotlin

public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory);
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {

	private val databaseClient = DatabaseClient.create(connectionFactory)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}

显式配置的替代方法是使用组件扫描和依赖注入的注解支持。在这种情况下,您可以使用 @Component 注解类(使其成为组件扫描的候选对象),并使用 @Autowired 注解 ConnectionFactory setter 方法。以下示例显示了如何执行此操作

  • Java

  • Kotlin

@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	@Autowired (2)
	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory); (3)
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 使用 @Component 注解类。
2 使用 @Autowired 注解 ConnectionFactory setter 方法。
3 使用 ConnectionFactory 创建一个新的 DatabaseClient
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)

	private val databaseClient = DatabaseClient(connectionFactory) (3)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1 使用 @Component 注解类。
2 ConnectionFactory 的构造函数注入。
3 使用 ConnectionFactory 创建一个新的 DatabaseClient

无论您选择使用(或不使用)上述哪种模板初始化样式,每次想要运行 SQL 时创建 DatabaseClient 类的新的实例都是很少必要的。一旦配置,DatabaseClient 实例就是线程安全的。如果您的应用程序访问多个数据库,您可能需要多个 DatabaseClient 实例,这需要多个 ConnectionFactory,随后需要多个配置不同的 DatabaseClient 实例。

检索自动生成的键

INSERT 语句在将行插入到定义自动递增或标识列的表中时可能会生成键。要完全控制要生成的列名,只需注册一个 StatementFilterFunction,该函数请求所需列的生成的键。

  • Java

  • Kotlin

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter(statement -> s.returnGeneratedValues("id"))
		.map(row -> row.get("id", Integer.class))
		.first();

// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { statement -> s.returnGeneratedValues("id") }
		.map { row -> row.get("id", Integer.class) }
		.awaitOne()

// generatedId emits the generated key once the INSERT statement has finished

控制数据库连接

本节介绍

使用 ConnectionFactory

Spring 通过 ConnectionFactory 获取到数据库的 R2DBC 连接。ConnectionFactory 是 R2DBC 规范的一部分,并且是驱动程序的常用入口点。它允许容器或框架隐藏应用程序代码的连接池和事务管理问题。作为开发人员,您无需了解连接到数据库的详细信息。这是设置 ConnectionFactory 的管理员的责任。在开发和测试代码时,您很可能同时担任这两个角色,但您不必知道生产数据源是如何配置的。

当您使用 Spring 的 R2DBC 层时,您可以使用第三方提供的连接池实现配置自己的连接池。一个流行的实现是 R2DBC Pool (r2dbc-pool)。Spring 发行版中的实现仅用于测试目的,不提供池化。

要配置 ConnectionFactory

  1. 像您通常获取 R2DBC ConnectionFactory 一样,使用 ConnectionFactory 获取连接。

  2. 提供一个 R2DBC URL(有关正确的值,请参阅驱动程序的文档)。

以下示例显示了如何配置 ConnectionFactory

  • Java

  • Kotlin

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

使用 ConnectionFactoryUtils

ConnectionFactoryUtils 类是一个方便且功能强大的辅助类,它提供 static 方法来从 ConnectionFactory 获取连接并在必要时关闭连接。

它支持订阅者 Context 绑定的连接,例如 R2dbcTransactionManager

使用 SingleConnectionFactory

SingleConnectionFactory 类是 DelegatingConnectionFactory 接口的一个实现,它包装了一个在每次使用后不会关闭的单个 Connection

如果任何客户端代码在假设池化连接的情况下调用 close(如使用持久性工具时),则应将 suppressClose 属性设置为 true。此设置返回一个关闭抑制代理,该代理包装物理连接。请注意,您不能再将其转换为本机 Connection 或类似对象。

SingleConnectionFactory 主要是一个测试类,可用于特定需求,例如如果您的 R2DBC 驱动程序允许则进行流水线处理。与池化的 ConnectionFactory 相比,它始终重用相同的连接,避免过度创建物理连接。

使用 TransactionAwareConnectionFactoryProxy

TransactionAwareConnectionFactoryProxy 是目标 ConnectionFactory 的代理。该代理包装目标 ConnectionFactory 以添加对 Spring 管理的事务的感知。

如果您使用未与 Spring 的 R2DBC 支持集成的 R2DBC 客户端,则需要使用此类。在这种情况下,您仍然可以使用此客户端,并且同时让此客户端参与 Spring 管理的事务。通常,最好将 R2DBC 客户端与对 ConnectionFactoryUtils 的正确访问集成以进行资源管理。

有关更多详细信息,请参阅 TransactionAwareConnectionFactoryProxy javadoc。

使用 R2dbcTransactionManager

R2dbcTransactionManager 类是单个 R2DBC ConnectionFactoryReactiveTransactionManager 实现。它将来自指定 ConnectionFactory 的 R2DBC Connection 绑定到订阅者 Context,可能允许每个 ConnectionFactory 使用一个订阅者 Connection

应用程序代码需要通过 ConnectionFactoryUtils.getConnection(ConnectionFactory) 而不是 R2DBC 的标准 ConnectionFactory.create() 来检索 R2DBC Connection。所有框架类(例如 DatabaseClient)都隐式地使用此策略。如果未使用事务管理器,则查找策略的行为与 ConnectionFactory.create() 完全相同,因此可以在任何情况下使用。