JDBC 消息存储
Spring 集成提供了两种特定于 JDBC 的消息存储实现。JdbcMessageStore
适合与聚合器和凭证检查模式一起使用。JdbcChannelMessageStore
实现提供了一种更具针对性和可扩展性的实现,专门用于消息通道。
请注意,您可以使用 JdbcMessageStore
来支持消息通道,JdbcChannelMessageStore
针对此目的进行了优化。
从版本 5.0.11、5.1.2 开始,JdbcChannelMessageStore 的索引已优化。如果您在这样的存储中拥有大量消息组,您可能希望更改索引。此外,PriorityChannel 的索引已注释掉,因为除非您使用由 JDBC 支持的此类通道,否则不需要它。
|
当使用 OracleChannelMessageStoreQueryProvider 时,必须添加优先级通道索引,因为它包含在查询中的提示中。
|
初始化数据库
在开始使用 JDBC 消息存储组件之前,您应该为目标数据库配置适当的对象。
Spring 集成附带了一些示例脚本,可用于初始化数据库。在 spring-integration-jdbc
JAR 文件中,您可以在 org.springframework.integration.jdbc
包中找到脚本。它提供了一个示例创建脚本和一个示例删除脚本,用于一系列常见的数据库平台。使用这些脚本的一种常见方法是在 Spring JDBC 数据源初始化器 中引用它们。请注意,这些脚本作为示例和所需表和列名称的规范提供。您可能会发现需要增强它们以用于生产环境(例如,通过添加索引声明)。
从版本 6.2 开始,JdbcMessageStore
、JdbcChannelMessageStore
、JdbcMetadataStore
和 DefaultLockRepository
实现 SmartLifecycle
并执行 `SELECT COUNT` 查询,在它们各自的表中,在 start()
方法中以确保目标数据库中存在所需的表(根据提供的前缀)。如果所需的表不存在,应用程序上下文将无法启动。可以通过 setCheckDatabaseOnStart(false)
禁用检查。
通用 JDBC 消息存储
JDBC 模块提供了一个 Spring Integration 的 MessageStore
(在凭证检查模式中很重要)和 MessageGroupStore
(在有状态模式中很重要,例如聚合器)的实现,这些实现由数据库支持。这两个接口都由 JdbcMessageStore
实现,并且支持在 XML 中配置存储实例,如下例所示
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定一个 JdbcTemplate
而不是 DataSource
。
以下示例展示了一些其他可选属性
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
在前面的示例中,我们指定了一个 LobHandler
用于处理大对象消息(这对于 Oracle 来说通常是必要的),以及查询中生成的表名的前缀。表名前缀默认为 INT_
。
支持消息通道
如果您打算使用 JDBC 支持消息通道,我们建议使用 JdbcChannelMessageStore
实现。它仅与消息通道配合使用。
支持的数据库
JdbcChannelMessageStore
使用特定于数据库的 SQL 查询从数据库中检索消息。因此,您必须在 JdbcChannelMessageStore
上设置 ChannelMessageStoreQueryProvider
属性。此 channelMessageStoreQueryProvider
提供您指定的特定数据库的 SQL 查询。Spring Integration 支持以下关系型数据库
-
PostgreSQL
-
HSQLDB
-
MySQL
-
Oracle
-
Derby
-
H2
-
SqlServer
-
Sybase
-
DB2
如果您的数据库未列出,您可以实现 ChannelMessageStoreQueryProvider
接口并提供您自己的自定义查询。
版本 4.0 在表中添加了 MESSAGE_SEQUENCE
列,以确保即使消息在同一毫秒内存储,也能实现先进先出 (FIFO) 排队。
从版本 6.2 开始,ChannelMessageStoreQueryProvider
公开了一个 isSingleStatementForPoll
标志,其中 PostgresChannelMessageStoreQueryProvider
返回 true
,并且其用于轮询的查询现在基于单个 DELETE…RETURNING
语句。JdbcChannelMessageStore
会查询 isSingleStatementForPoll
选项,如果只支持单个轮询语句,则会跳过单独的 DELETE
语句。
自定义消息插入
从版本 5.0 开始,通过重载 ChannelMessageStorePreparedStatementSetter
类,您可以为 JdbcChannelMessageStore
中的消息插入提供自定义实现。您可以使用它来设置不同的列或更改表结构或序列化策略。例如,您可以将结构存储为 JSON 字符串,而不是默认序列化为 byte[]
。
以下示例使用setValues
的默认实现来存储公共列,并覆盖行为以将消息有效负载存储为varchar
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议将关系数据库用于排队。相反,如果可能,请考虑使用 JMS 或 AMQP 支持的通道。有关更多参考,请参阅以下资源 如果您仍然计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,该机制在后续部分中进行了描述。 |
并发轮询
轮询消息通道时,您可以选择使用TaskExecutor
引用配置关联的Poller
。
请记住,如果您使用 JDBC 支持的消息通道,并且您计划以多个线程的方式轮询通道以及消息存储事务,则应确保使用支持多版本并发控制 (MVCC) 的关系数据库。否则,锁定可能会成为问题,并且使用多个线程时的性能可能无法像预期的那样实现。例如,Apache Derby 在这方面存在问题。 为了实现更好的 JDBC 队列吞吐量并避免不同线程可能从队列中轮询相同
|
优先级通道
从 4.0 版本开始,JdbcChannelMessageStore
实现了PriorityCapableChannelMessageStore
并提供了priorityEnabled
选项,使其可以作为priority-queue
实例的message-store
引用。为此,INT_CHANNEL_MESSAGE
表有一个MESSAGE_PRIORITY
列来存储PRIORITY
消息头的值。此外,新的MESSAGE_SEQUENCE
列使我们能够实现稳健的先进先出 (FIFO) 轮询机制,即使在同一毫秒内存储了具有相同优先级的多个消息。从数据库中轮询(选择)消息的顺序为order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
。
我们不建议对优先级和非优先级队列通道使用相同的JdbcChannelMessageStore bean,因为priorityEnabled 选项适用于整个存储,并且不会为队列通道保留正确的FIFO队列语义。但是,相同的INT_CHANNEL_MESSAGE 表(甚至region )可以用于两种JdbcChannelMessageStore 类型。要配置这种情况,您可以从另一个消息存储 bean 扩展一个,如下面的示例所示
|
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
分区消息存储
通常将JdbcMessageStore
用作同一应用程序中一组应用程序或节点的全局存储。为了提供一些针对名称冲突的保护,并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。一种方法是使用单独的表名,通过更改前缀(如前面所述)。另一种方法是为分区单个表中的数据指定region
名称。第二种方法的一个重要用例是当MessageStore
管理支持 Spring Integration 消息通道的持久队列时。持久通道的消息数据在存储中以通道名称为键。因此,如果通道名称不是全局唯一的,则通道可能会获取不属于它们的数据。为了避免这种危险,您可以使用消息存储region
为具有相同逻辑名称的不同物理通道保持数据分离。
PostgreSQL:接收推送通知
PostgreSQL 提供了一个监听和通知框架,用于在数据库表操作时接收推送通知。Spring Integration 利用此机制(从版本 6.0 开始)允许在将新消息添加到JdbcChannelMessageStore
时接收推送通知。使用此功能时,必须定义一个数据库触发器,该触发器可以在schema-postgresql.sql
文件的注释中找到,该文件包含在 Spring Integration 的 JDBC 模块中。
推送通知通过PostgresChannelMessageTableSubscriber
类接收,该类允许其订阅者在任何给定region
和groupId
的新消息到达时接收回调。即使消息是在不同的 JVM 上追加到同一个数据库,这些通知也会被接收。PostgresSubscribableChannel
实现使用PostgresChannelMessageTableSubscriber.Subscription
契约从存储中拉取消息,作为对来自上述PostgresChannelMessageTableSubscriber
通知的通知的反应。
例如,可以按如下方式接收some group
的推送通知
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事务支持
从 6.0.5 版本开始,在PostgresSubscribableChannel
上指定PlatformTransactionManager
将在事务中通知订阅者。订阅者中的异常将导致事务回滚,并将消息放回消息存储。事务支持默认情况下未激活。
重试
从 6.0.5 版本开始,可以通过向PostgresSubscribableChannel
提供RetryTemplate
来指定重试策略。默认情况下,不执行任何重试。
任何活动的 为了满足对独占连接的需求,建议 JVM 仅运行一个 |