R2DBC 支持

Spring Integration 提供了通道适配器,用于通过使用对数据库的反应式访问来接收和发送消息 R2DBC 驱动程序。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>6.3.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:6.3.0"

R2DBC 入站通道适配器

R2dbcMessageSource 是基于 R2dbcEntityOperations 的可轮询 MessageSource 实现,并生成带有 FluxMono 作为有效负载的消息,用于从数据库中根据 expectSingleResult 选项获取数据。SELECT 的查询可以是静态提供的,也可以基于在每次 receive() 调用时计算的 SpEL 表达式。R2dbcMessageSource.SelectCreator 作为评估上下文的根对象存在,以允许使用 StatementMapper.SelectSpec 流式 API。默认情况下,此通道适配器将选择中的记录映射到 LinkedCaseInsensitiveMap 实例。它可以通过提供 payloadType 选项进行自定义,该选项在 EntityRowMapper 下面使用,该选项基于 this.r2dbcEntityOperations.getConverter()updateSql 是可选的,用于标记数据库中的已读取记录,以便从后续轮询中跳过。UPDATE 操作可以与 BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> 一起提供,以将值绑定到基于 SELECT 结果中的记录的 UPDATE 中。

此通道适配器的一个典型配置可能如下所示

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

使用 Java DSL,此通道适配器的配置如下所示

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlow
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .handle((p, h) -> p)
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 出站通道适配器

R2dbcMessageHandler 是一个 ReactiveMessageHandler 实现,用于使用提供的 R2dbcEntityOperations 在数据库中执行 INSERT(默认)、UPDATEDELETE 查询。R2dbcMessageHandler.Type 可以静态配置,也可以通过针对请求消息的 SpEL 表达式配置。要执行的查询可以基于 tableNamevaluescriteria 表达式选项,或者(如果未提供 tableName),则整个消息有效负载将被视为 org.springframework.data.relational.core.mapping.Table 实体以执行针对它的 SQL。包 org.springframework.data.relational.core.query 注册为 SpEL 评估上下文的导入,以便直接访问 Criteria 流式 API,该 API 用于 UPDATEDELETE 查询。valuesExpression 用于 INSERTUPDATE,必须评估为 Map,用于列值对,以对请求消息执行目标表中的更改。

此通道适配器的一个典型配置可能如下所示

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

使用 Java DSL,此通道适配器的配置如下所示

.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))