Redis 支持
Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。此支持以基于 Redis 的 MessageStore
以及 Redis 通过其 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令支持的发布-订阅消息适配器形式提供。
您需要将此依赖项包含到您的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.3.5"
您还需要包含 Redis 客户端依赖项,例如 Lettuce。
要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,您首先需要连接到它。Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 构造:ConnectionFactory
和 Template
。这些抽象简化了与多个 Redis 客户端 Java API 的集成。目前,Spring Data Redis 支持 Jedis 和 Lettuce。
使用 RedisConnectionFactory
要连接到 Redis,您可以使用 RedisConnectionFactory
接口的实现之一。以下列表显示了接口定义
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例演示如何在 Java 中创建 LettuceConnectionFactory
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例演示如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory
的实现提供了一组属性,例如端口和主机,如果需要,您可以设置这些属性。获得 RedisConnectionFactory
的实例后,您可以创建 RedisTemplate
的实例并将其注入 RedisConnectionFactory
。
使用 RedisTemplate
与 Spring 中的其他模板类(如 JdbcTemplate
和 JmsTemplate
)一样,RedisTemplate
是一个辅助类,简化了 Redis 数据访问代码。有关 RedisTemplate
及其变体(如 StringRedisTemplate
)的更多信息,请参阅 Spring Data Redis 文档。
以下示例演示如何在 Java 中创建 RedisTemplate
的实例
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例演示如何在 Spring 的 XML 配置中创建 RedisTemplate
的实例
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息传递
如 引言 中所述,Redis 通过其 PUBLISH
、SUBSCRIBE
和 UNSUBSCRIBE
命令提供对发布-订阅消息传递的支持。与 JMS 和 AMQP 一样,Spring Integration 提供了消息通道和适配器,用于通过 Redis 发送和接收消息。
Redis 发布/订阅通道
与 JMS 类似,在某些情况下,生产者和消费者都打算成为同一应用程序的一部分,在同一进程中运行。您可以通过使用一对入站和出站通道适配器来实现此目的。但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法来解决此用例。您可以创建一个发布-订阅通道,如下例所示
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel
的行为类似于主 Spring Integration 命名空间中的普通 <publish-subscribe-channel/>
元素。任何端点的 input-channel
和 output-channel
属性都可以引用它。不同之处在于此通道由 Redis 主题名称支持:由 topic-name
属性指定的 String
值。但是,与 JMS 不同,此主题无需预先创建,甚至无需由 Redis 自动创建。在 Redis 中,主题是充当地址的简单 String
值。生产者和消费者可以使用相同的 String
值作为其主题名称进行通信。对此通道的简单订阅意味着生产者和消费者端点之间可以进行异步发布-订阅消息传递。但是,与通过在简单的 Spring Integration <channel/>
元素中添加 <queue/>
元素创建的异步消息通道不同,消息不会存储在内存队列中。相反,这些消息通过 Redis 传递,这使您可以依靠其对持久性和集群的支持以及其与其他非 Java 平台的互操作性。
Redis 入站通道适配器
Redis 入站通道适配器 (RedisInboundChannelAdapter
) 以与其他入站适配器相同的方式将传入的 Redis 消息适配为 Spring 消息。它接收特定于平台的消息(在本例中为 Redis),并使用 MessageConverter
策略将其转换为 Spring 消息。以下示例演示如何配置 Redis 入站通道适配器
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。请注意,前面的配置依赖于熟悉的 Spring 范例,即自动发现某些 Bean。在本例中,redisConnectionFactory
会隐式注入适配器。您可以通过使用 connection-factory
属性代替来显式指定它。
此外,请注意,前面的配置将自定义 MessageConverter
注入适配器。此方法类似于 JMS,其中 MessageConverter
实例用于在 Redis 消息和 Spring Integration 消息有效负载之间进行转换。默认值为 SimpleMessageConverter
。
入站适配器可以订阅多个主题名称,因此 topics
属性中有一组以逗号分隔的值。
从 3.0 版本开始,入站适配器除了现有的 topics
属性外,现在还具有 topic-patterns
属性。此属性包含一组以逗号分隔的 Redis 主题模式。有关 Redis 发布-订阅的更多信息,请参阅 Redis 发布/订阅。
入站适配器可以使用 RedisSerializer
反序列化 Redis 消息的主体。<int-redis:inbound-channel-adapter>
的 serializer
属性可以设置为空字符串,这会导致 RedisSerializer
属性的值为 null
。在这种情况下,Redis 消息的原始 byte[]
主体作为消息有效负载提供。
从 5.0 版本开始,您可以通过使用 <int-redis:inbound-channel-adapter>
的 task-executor
属性向入站适配器提供 Executor
实例。此外,接收到的 Spring Integration 消息现在具有 RedisHeaders.MESSAGE_SOURCE
标头以指示已发布消息的来源:主题或模式。您可以将其用于下游路由逻辑。
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息适配为 Redis 消息。它接收 Spring Integration 消息,并使用 MessageConverter
策略将其转换为特定于平台的消息(在本例中为 Redis)。以下示例演示如何配置 Redis 出站通道适配器
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
配置与 Redis 入站通道适配器类似。适配器隐式注入 RedisConnectionFactory
,其 Bean 名称定义为 redisConnectionFactory
。此示例还包括可选的(和自定义的)MessageConverter
(testConverter
Bean)。
从 Spring Integration 3.0 开始,<int-redis:outbound-channel-adapter>
提供了 topic
属性的替代方案:您可以使用 topic-expression
属性在运行时确定消息的 Redis 主题。这些属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了一个队列入站通道适配器,用于从 Redis 列表“弹出”消息。默认情况下,它使用“右弹出”,但您可以将其配置为改为使用“左弹出”。适配器是消息驱动的。它使用内部侦听器线程,不使用轮询器。
以下列表显示了 queue-inbound-channel-adapter
的所有可用属性
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
1 | 组件 Bean 名称。如果您不提供 channel 属性,则会创建一个 DirectChannel 并使用此 id 属性作为 Bean 名称在应用程序上下文中注册。在这种情况下,端点本身使用 Bean 名称 id 加上 .adapter 进行注册。(如果 Bean 名称为 thing1 ,则端点注册为 thing1.adapter 。) |
2 | 要将来自此端点的 Message 实例发送到的 MessageChannel 。 |
3 | 一个 SmartLifecycle 属性,用于指定应用程序上下文启动后此端点是否应自动启动。默认为 true 。 |
4 | 一个 SmartLifecycle 属性,用于指定启动此端点的阶段。默认为 0 。 |
5 | 对 RedisConnectionFactory Bean 的引用。默认为 redisConnectionFactory 。 |
6 | 执行基于队列的“弹出”操作以获取 Redis 消息的 Redis 列表的名称。 |
7 | 当从端点的侦听任务接收异常时,要将 ErrorMessage 实例发送到的 MessageChannel 。默认情况下,底层的 MessagePublishingErrorHandler 使用应用程序上下文中的默认 errorChannel 。 |
8 | RedisSerializer Bean 引用。它可以是空字符串,这意味着“无序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 作为 Message 有效负载发送到 channel 。默认情况下,它是一个 JdkSerializationRedisSerializer 。 |
9 | “弹出”操作等待队列中 Redis 消息的超时时间(以毫秒为单位)。默认为 1 秒。 |
10 | 在“弹出”操作发生异常后,侦听器任务应休眠的时间(以毫秒为单位),然后重新启动侦听器任务。 |
11 | 指定此端点是否期望来自 Redis 队列的数据包含完整的 Message 实例。如果此属性设置为 true ,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。其默认值为 false 。 |
12 | 对 Spring TaskExecutor (或标准 JDK 1.5+ Executor )bean 的引用。它用于底层监听任务。默认为 SimpleAsyncTaskExecutor 。 |
13 | 指定此端点是否应使用“右弹出”(当 true 时)或“左弹出”(当 false 时)从 Redis 列表读取消息。如果为 true ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当 FIFO 队列。将其设置为 false 以与使用“右推入”写入列表的软件一起使用,或实现类似堆栈的消息顺序。其默认值为 true 。自 4.3 版起。 |
task-executor 必须配置为具有多个线程才能进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动监听器任务时,可能会出现死锁。errorChannel 可用于处理这些错误,以避免重新启动,但最好不要让您的应用程序面临可能的死锁情况。有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册。 |
Redis 队列出站通道适配器
Spring Integration 3.0 引入了一个队列出站通道适配器,用于从 Spring Integration 消息“推入”到 Redis 列表。默认情况下,它使用“左推入”,但您可以将其配置为改为使用“右推入”。以下清单显示了 Redis queue-outbound-channel-adapter
的所有可用属性
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
1 | 组件 bean 名称。如果您不提供 channel 属性,则会创建一个 DirectChannel 并使用此 id 属性作为 bean 名称在应用程序上下文中注册它。在这种情况下,端点将使用 id 加上 .adapter 的 bean 名称注册。(如果 bean 名称为 thing1 ,则端点将注册为 thing1.adapter 。) |
2 | 此端点从中接收 Message 实例的 MessageChannel 。 |
3 | 对 RedisConnectionFactory Bean 的引用。默认为 redisConnectionFactory 。 |
4 | 执行基于队列的“推入”操作以发送 Redis 消息的 Redis 列表的名称。此属性与 queue-expression 互斥。 |
5 | 一个 SpEL Expression ,用于确定 Redis 列表的名称。它在运行时使用传入的 Message 作为 #root 变量。此属性与 queue 互斥。 |
6 | 一个 RedisSerializer bean 引用。它默认为 JdkSerializationRedisSerializer 。但是,对于 String 负载,如果未提供 serializer 引用,则使用 StringRedisSerializer 。 |
7 | 指定此端点是否应仅将负载或整个 Message 发送到 Redis 队列。默认为 true 。 |
8 | 指定此端点是否应使用“左推入”(当 true 时)或“右推入”(当 false 时)将消息写入 Redis 列表。如果为 true ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当 FIFO 队列。将其设置为 false 以与使用“左弹出”读取列表的软件一起使用,或实现类似堆栈的消息顺序。它默认为 true 。自 4.3 版起。 |
Redis 应用程序事件
自 Spring Integration 3.0 起,Redis 模块提供了一个 IntegrationEvent
的实现,该实现反过来又是一个 org.springframework.context.ApplicationEvent
。RedisExceptionEvent
封装了 Redis 操作的异常(端点作为事件的“源”)。例如,<int-redis:queue-inbound-channel-adapter/>
在捕获 BoundListOperations.rightPop
操作的异常后会发出这些事件。异常可以是任何泛型 org.springframework.data.redis.RedisSystemException
或 org.springframework.data.redis.RedisConnectionFailureException
。使用 <int-event:inbound-channel-adapter/>
处理这些事件对于确定后台 Redis 任务的问题并采取管理操作很有用。
Redis 消息存储
如企业集成模式 (EIP) 书籍中所述,消息存储 允许您持久化消息。当处理具有缓冲消息功能的组件(聚合器、重新排序器等)时,这可能很有用,尤其是在可靠性至关重要的情况下。在 Spring Integration 中,MessageStore
策略也为 凭证检查 模式提供了基础,该模式也在 EIP 中进行了描述。
Spring Integration 的 Redis 模块提供了 RedisMessageStore
。以下示例演示了如何在聚合器中使用它
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是 bean 配置,它期望 RedisConnectionFactory
作为构造函数参数。
默认情况下,RedisMessageStore
使用 Java 序列化来序列化消息。但是,如果您想使用不同的序列化技术(例如 JSON),可以通过设置 RedisMessageStore
的 valueSerializer
属性来提供您自己的序列化器。
从 4.3.10 版开始,框架为 Message
实例和 MessageHeaders
实例分别提供了 Jackson 序列化器和反序列化器实现——MessageJacksonDeserializer
和 MessageHeadersJacksonSerializer
。必须使用 ObjectMapper
的 SimpleModule
选项配置它们。此外,您应该在 ObjectMapper
上设置 enableDefaultTyping
以为每个序列化的复杂对象添加类型信息(如果您信任源)。然后,该类型信息将在反序列化期间使用。框架提供了一个名为 JacksonJsonUtils.messagingAwareMapper()
的实用程序方法,该方法已预先提供所有前面提到的属性和序列化器。此实用程序方法带有 trustedPackages
参数,用于限制反序列化的 Java 包,以避免安全漏洞。默认的受信任包:java.util
、java.lang
、org.springframework.messaging.support
、org.springframework.integration.support
、org.springframework.integration.message
、org.springframework.integration.store
。要在 RedisMessageStore
中管理 JSON 序列化,您必须以类似于以下示例的方式配置它
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从 4.3.12 版开始,RedisMessageStore
支持 prefix
选项,以允许区分同一 Redis 服务器上的存储实例。
Redis 通道消息存储
RedisMessageStore
如前所述 将每个组维护为单个键(组 ID)下的值。虽然您可以使用它来支持 QueueChannel
的持久性,但为此目的提供了一个专门的 RedisChannelMessageStore
(自 4.0 版起)。此存储为每个通道使用一个 LIST
,发送消息时使用 LPUSH
,接收消息时使用 RPOP
。默认情况下,此存储也使用 JDK 序列化,但您可以修改值序列化器,如前所述。
我们建议使用此存储支持通道,而不是使用通用 RedisMessageStore
。以下示例定义了一个 Redis 消息存储,并在带有队列的通道中使用它
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键具有以下形式:<storeBeanName>:<channelId>
(在前面的示例中,redisMessageStore:somePersistentQueueChannel
)。
此外,还提供了子类 RedisChannelPriorityMessageStore
。当您将其与 QueueChannel
一起使用时,消息将按 (FIFO) 优先级顺序接收。它使用标准 IntegrationMessageHeaderAccessor.PRIORITY
标头并支持优先级值 (0 - 9
)。其他优先级的消息(以及没有优先级的消息)在任何具有优先级的消息之后按 FIFO 顺序检索。
这些存储仅实现 BasicMessageGroupStore ,而不实现 MessageGroupStore 。它们仅可用于支持 QueueChannel 等情况。 |
Redis 元数据存储
Spring Integration 3.0 引入了一个新的基于 Redis 的 MetadataStore
(请参阅 元数据存储)实现。您可以使用 RedisMetadataStore
在应用程序重新启动时维护 MetadataStore
的状态。您可以将此新的 MetadataStore
实现与以下适配器一起使用:
要指示这些适配器使用新的 RedisMetadataStore
,请声明一个名为 metadataStore
的 Spring bean。Feed 入站通道适配器和 feed 入站通道适配器都会自动获取并使用声明的 RedisMetadataStore
。以下示例演示了如何声明此类 bean
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore
由 RedisProperties
支持。与它的交互使用 BoundHashOperations
,这反过来又需要整个 Properties
存储的 key
。在 MetadataStore
的情况下,此 key
充当区域的角色,这在分布式环境中很有用,当多个应用程序使用相同的 Redis 服务器时。默认情况下,此 key
的值为 MetaData
。
从 4.0 版开始,此存储实现了 ConcurrentMetadataStore
,使其能够可靠地跨多个应用程序实例共享,其中仅允许一个实例存储或修改键的值。
您不能将 RedisMetadataStore.replace() (例如,在 AbstractPersistentAcceptOnceFileListFilter 中)与 Redis 集群一起使用,因为目前不支持用于原子性的 WATCH 命令。 |
Redis 存储入站通道适配器
Redis 存储入站通道适配器是一个轮询使用者,它从 Redis 集合读取数据并将其作为 Message
负载发送。以下示例演示了如何配置 Redis 存储入站通道适配器
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例演示了如何使用 store-inbound-channel-adapter
元素配置 Redis 存储入站通道适配器,为各种属性提供值,例如
-
key
或key-expression
:正在使用的集合的键的名称。 -
collection-type
:此适配器支持的集合类型的枚举。支持的集合为LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
connection-factory
:对o.s.data.redis.connection.RedisConnectionFactory
实例的引用。 -
redis-template
:对o.s.data.redis.core.RedisTemplate
实例的引用。 -
所有其他入站适配器中常见的其他属性(例如“channel”)。
您不能同时设置 redis-template 和 connection-factory 。 |
默认情况下,适配器使用
|
由于它具有 key
的字面值,因此前面的示例相对简单且静态。有时,您可能需要根据某些条件在运行时更改键的值。为此,请改用 key-expression
,其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,您可能希望对从 Redis 集合读取的已成功处理的数据执行一些后处理。例如,您可能希望在处理值后移动或删除它。您可以使用 Spring Integration 2.2 中添加的事务同步功能来做到这一点。以下示例使用 key-expression
和事务同步
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
您可以通过使用transactional
元素将轮询器声明为事务性的。此元素可以引用一个真正的交易管理器(例如,如果您的流程的其他部分调用了JDBC)。如果您没有“真正的”事务,则可以使用o.s.i.transaction.PseudoTransactionManager
,它是Spring的PlatformTransactionManager
的实现,并在没有实际事务时启用Redis适配器的交易同步功能。
这不会使Redis活动本身成为事务性的。它允许在成功(提交)或失败(回滚)之前或之后采取操作的同步。 |
一旦您的轮询器成为事务性的,您可以在transactional
元素上设置o.s.i.transaction.TransactionSynchronizationFactory
的实例。TransactionSynchronizationFactory
创建TransactionSynchronization
的实例。为了方便起见,我们公开了一个默认的基于SpEL的TransactionSynchronizationFactory
,它允许您配置SpEL表达式,并将其执行与事务协调(同步)。支持提交前、提交后和回滚后的表达式,以及通道(每种事件类型一个),其中发送评估结果(如果有)。对于每个子元素,您可以指定expression
和channel
属性。如果仅存在channel
属性,则接收到的消息将作为特定同步场景的一部分发送到该通道。如果仅存在expression
属性并且表达式的结果为非空值,则会生成一条消息,其有效负载为结果,并发送到默认通道(NullChannel
)并在日志中显示(在DEBUG
级别)。如果您希望评估结果发送到特定通道,请添加channel
属性。如果表达式的结果为null或void,则不会生成任何消息。
RedisStoreMessageSource
添加了一个store
属性,其中包含绑定到事务IntegrationResourceHolder
的RedisStore
实例,可以从TransactionSynchronizationProcessor
实现中访问。
有关事务同步的更多信息,请参阅事务同步。
RedisStore出站通道适配器
RedisStore出站通道适配器允许您将消息有效负载写入Redis集合,如下例所示
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置使用store-inbound-channel-adapter
元素配置了一个Redis存储出站通道适配器。它为各种属性提供值,例如
-
key
或key-expression
:正在使用的集合的键的名称。 -
extract-payload-elements
:如果设置为true
(默认值)并且有效负载是“多值”对象的实例(即Collection
或Map
),则使用“addAll”和“putAll”语义进行存储。否则,如果设置为false
,则无论有效负载的类型如何,都将其存储为单个条目。如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。 -
collection-type
:此适配器支持的Collection
类型的枚举。支持的集合为LIST
、SET
、ZSET
、PROPERTIES
和MAP
。 -
map-key-expression
:返回要存储的条目的键名称的SpEL表达式。仅当collection-type
为MAP
或PROPERTIES
且'extract-payload-elements'为false时才适用。 -
connection-factory
:对o.s.data.redis.connection.RedisConnectionFactory
实例的引用。 -
redis-template
:对o.s.data.redis.core.RedisTemplate
实例的引用。 -
所有其他入站适配器中常见的其他属性(例如“channel”)。
您不能同时设置 redis-template 和 connection-factory 。 |
默认情况下,适配器使用StringRedisTemplate 。它对键、值、哈希键和哈希值使用StringRedisSerializer 实例。但是,如果extract-payload-elements 设置为false ,则将使用一个RedisTemplate ,该模板对键和哈希键使用StringRedisSerializer 实例,对值和哈希值使用JdkSerializationRedisSerializer 实例。使用JDK序列化器时,重要的是要了解Java序列化用于所有值,而不管值是否实际上是集合。如果您需要更多地控制值的序列化,请考虑提供您自己的RedisTemplate ,而不是依赖这些默认值。 |
由于它对key
和其他属性具有文字值,因此前面的示例相对简单且静态。有时,您可能需要根据某些条件在运行时动态更改这些值。为此,请使用它们的-expression
等效项(key-expression
、map-key-expression
等),其中提供的表达式可以是任何有效的SpEL表达式。
Redis出站命令网关
Spring Integration 4.0引入了Redis命令网关,允许您使用通用RedisConnection#execute
方法执行任何标准Redis命令。以下列表显示了Redis出站网关的可用属性
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
1 | 此端点从中接收 Message 实例的 MessageChannel 。 |
2 | 此端点发送回复Message 实例的MessageChannel 。 |
3 | 指定此出站网关是否必须返回值。默认为true 。当Redis返回值为null 时,将抛出ReplyRequiredException 。 |
4 | 等待发送回复消息的超时时间(以毫秒为单位)。通常应用于基于队列的有限回复通道。 |
5 | 对RedisConnectionFactory bean的引用。默认为redisConnectionFactory 。它与'redis-template'属性互斥。 |
6 | 对RedisTemplate bean的引用。它与'connection-factory'属性互斥。 |
7 | 对org.springframework.data.redis.serializer.RedisSerializer 实例的引用。如有必要,它用于将每个命令参数序列化为byte[]。 |
8 | 返回命令键的SpEL表达式。默认为redis_command 消息头。它不能计算为null 。 |
9 | 以逗号分隔的SpEL表达式,这些表达式被评估为命令参数。与arguments-strategy 属性互斥。如果您不提供这两个属性,则payload 将用作命令参数。参数表达式可以计算为'null'以支持可变数量的参数。 |
10 | 一个boolean 标志,用于指定当配置了argument-expressions 时,在o.s.i.redis.outbound.ExpressionArgumentsStrategy 中,评估的Redis命令字符串是否作为#cmd 变量在表达式评估上下文中可用。否则,忽略此属性。 |
11 | 对o.s.i.redis.outbound.ArgumentsStrategy 实例的引用。它与argument-expressions 属性互斥。如果您不提供这两个属性,则payload 将用作命令参数。 |
您可以使用<int-redis:outbound-gateway>
作为通用组件来执行任何所需的Redis操作。以下示例显示了如何从Redis原子数获取增量值
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message
有效负载应具有名为redisCounter
的名称,该名称可能由org.springframework.data.redis.support.atomic.RedisAtomicInteger
bean定义提供。
RedisConnection#execute
方法的返回类型为泛型Object
。实际结果取决于命令类型。例如,MGET
返回一个List<byte[]>
。有关命令、其参数和结果类型的更多信息,请参阅Redis规范。
Redis队列出站网关
Spring Integration引入了Redis队列出站网关来执行请求和回复场景。它将对话UUID
推送到提供的queue
,将该UUID
作为键的值推送到Redis列表,并等待来自键为UUID
加上.reply
的Redis列表的回复。每次交互都使用不同的UUID。以下列表显示了Redis出站网关的可用属性
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
1 | 此端点从中接收 Message 实例的 MessageChannel 。 |
2 | 此端点发送回复Message 实例的MessageChannel 。 |
3 | 指定此出站网关是否必须返回值。此值默认为false 。否则,当Redis返回值为null 时,将抛出ReplyRequiredException 。 |
4 | 等待发送回复消息的超时时间(以毫秒为单位)。通常应用于基于队列的有限回复通道。 |
5 | 对RedisConnectionFactory bean的引用。默认为redisConnectionFactory 。它与'redis-template'属性互斥。 |
6 | 出站网关发送对话UUID 的Redis列表的名称。 |
7 | 注册多个网关时,此出站网关的顺序。 |
8 | RedisSerializer bean引用。它可以是空字符串,表示“无序列化器”。在这种情况下,来自入站Redis消息的原始byte[] 将作为Message 有效负载发送到channel 。默认为JdkSerializationRedisSerializer 。 |
9 | 指定此端点是否期望来自Redis队列的数据包含完整的Message 实例。如果此属性设置为true ,则serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为JDK序列化)。 |
Redis队列入站网关
Spring Integration 4.1引入了Redis队列入站网关来执行请求和回复场景。它从提供的queue
中弹出对话UUID
,从Redis列表中弹出该UUID
作为键的值,并将回复推送到键为UUID
加上.reply
的Redis列表。以下列表显示了Redis队列入站网关的可用属性
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
1 | 此端点发送从Redis数据创建的Message 实例的MessageChannel 。 |
2 | 此端点等待回复Message 实例的MessageChannel 。可选 - replyChannel 标头仍在使用。 |
3 | 对Spring TaskExecutor (或标准JDK Executor )bean的引用。它用于底层侦听任务。默认为SimpleAsyncTaskExecutor 。 |
4 | 等待发送回复消息的超时时间(以毫秒为单位)。通常应用于基于队列的有限回复通道。 |
5 | 对RedisConnectionFactory bean的引用。默认为redisConnectionFactory 。它与'redis-template'属性互斥。 |
6 | 对话UUID 的Redis列表的名称。 |
7 | 注册多个网关时,此入站网关的顺序。 |
8 | RedisSerializer bean引用。它可以是空字符串,表示“无序列化器”。在这种情况下,来自入站Redis消息的原始byte[] 将作为Message 有效负载发送到channel 。默认为JdkSerializationRedisSerializer 。(请注意,在版本4.3之前的版本中,默认为StringRedisSerializer 。要恢复该行为,请提供对StringRedisSerializer 的引用)。 |
9 | 等待获取接收消息的超时时间(以毫秒为单位)。通常应用于基于队列的有限请求通道。 |
10 | 指定此端点是否期望来自Redis队列的数据包含完整的Message 实例。如果此属性设置为true ,则serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为JDK序列化)。 |
11 | 在“正确弹出”操作发生异常后,侦听器任务应休眠的时间(以毫秒为单位),然后再重新启动侦听器任务。 |
task-executor 必须配置为具有多个线程才能进行处理;否则,当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动监听器任务时,可能会出现死锁。errorChannel 可用于处理这些错误,以避免重新启动,但最好不要让您的应用程序面临可能的死锁情况。有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册。 |
Redis流出站通道适配器
Spring Integration 5.4引入了Reactive Redis流出站通道适配器,用于将消息有效负载写入Redis流。出站通道适配器使用ReactiveStreamOperations.add(…)
将Record
添加到流中。以下示例显示了如何使用Java配置和服务类为Redis流出站通道适配器。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
1 | 使用ReactiveRedisConnectionFactory 和流名称构建ReactiveRedisStreamMessageHandler 的实例以添加记录。另一个构造函数变体基于SpEL表达式,用于根据请求消息评估流键。 |
2 | 设置RedisSerializationContext ,用于在添加到流之前序列化记录键和值。 |
3 | 设置HashMapper ,它提供Java类型和Redis哈希/映射之间的契约。 |
4 | 如果为“true”,则通道适配器将从请求消息中提取要添加的流记录的有效负载。或者使用整个消息作为值。默认为true 。 |
Redis流入站通道适配器
Spring Integration 5.4引入了用于从Redis流读取消息的Reactive Stream入站通道适配器。入站通道适配器使用StreamReceiver.receive(…)
或StreamReceiver.receiveAutoAck()
(基于自动确认标志)从Redis流读取记录。以下示例演示了如何使用Java配置用于Redis流入站通道适配器。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
1 | 使用ReactiveRedisConnectionFactory 和流键构建ReactiveRedisStreamMessageProducer 的实例以读取记录。 |
2 | 一个StreamReceiver.StreamReceiverOptions ,用于使用反应式基础设施使用redis流。 |
3 | 一个SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。默认为true 。如果为false ,则应手动启动RedisStreamMessageProducer messageProducer.start() 。 |
4 | 如果为false ,则不会自动确认接收到的消息。消息的确认将延迟到使用消息的客户端。默认为true 。 |
5 | 如果为true ,则将创建一个消费者组。在创建消费者组期间,也将创建流(如果尚不存在)。消费者组跟踪消息传递并在消费者之间进行区分。默认为false 。 |
6 | 设置消费者组名称。默认为定义的bean名称。 |
7 | 设置消费者名称。从组my-group 中以my-consumer 读取消息。 |
8 | 要将来自此端点消息发送到的消息通道。 |
9 | 定义读取消息的偏移量。默认为ReadOffset.latest() 。 |
10 | 如果为“true”,则通道适配器将从Record 中提取有效负载值。否则,整个Record 用作有效负载。默认为true 。 |
如果将autoAck
设置为false
,则Redis驱动程序不会自动确认Redis流中的Record
,而是将IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
标头添加到要生成的的消息中,其值为SimpleAcknowledgment
实例。目标集成流负责在基于此类记录的消息业务逻辑完成时调用其acknowledge()
回调。即使在反序列化期间发生异常并且配置了errorChannel
时,也需要类似的逻辑。因此,目标错误处理程序必须决定确认或否定此类失败的消息。除了IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
之外,ReactiveRedisStreamMessageProducer
还会将这些标头填充到要生成的的消息中:RedisHeaders.STREAM_KEY
、RedisHeaders.STREAM_MESSAGE_ID
、RedisHeaders.CONSUMER_GROUP
和RedisHeaders.CONSUMER
。
从5.5版开始,您可以在ReactiveRedisStreamMessageProducer
上显式配置StreamReceiver.StreamReceiverOptionsBuilder
选项,包括新引入的onErrorResume
函数,如果Redis Stream消费者在发生反序列化错误时应继续轮询,则需要此函数。默认函数将消息发送到错误通道(如果提供),并可能确认失败的消息,如上所述。所有这些StreamReceiver.StreamReceiverOptionsBuilder
都与外部提供的StreamReceiver.StreamReceiverOptions
互斥。
Redis锁注册表
Spring Integration 4.0引入了RedisLockRegistry
。某些组件(例如聚合器和重新排序器)使用从LockRegistry
实例获得的锁来确保一次只有一个线程操作一个组。DefaultLockRegistry
在单个组件内执行此功能。您现在可以在这些组件上配置外部锁注册表。当您将其与共享MessageGroupStore
一起使用时,可以使用RedisLockRegistry
跨多个应用程序实例提供此功能,以便一次只有一个实例可以操作该组。
当本地线程释放锁时,另一个本地线程通常可以立即获取锁。如果使用不同注册表实例的线程释放锁,则可能需要最多100毫秒才能获取锁。
为了避免“挂起”锁(当服务器发生故障时),此注册表中的锁在默认60秒后过期,但您可以在注册表上配置此值。锁通常保持较短的时间。
由于键可能会过期,因此尝试解锁过期的锁会导致抛出异常。但是,受此类锁保护的资源可能已受到损害,因此应将此类异常视为严重异常。您应将过期时间设置为足够大的值以防止这种情况,但将其设置为足够低的值,以便在服务器发生故障后在合理的时间内恢复锁。 |
从5.0版开始,RedisLockRegistry
实现了ExpirableLockRegistry
,它删除了最后获取时间超过age
且当前未锁定的锁。
从5.5.6版开始,RedisLockRegistry
支持通过RedisLockRegistry.setCacheCapacity()
自动清理RedisLockRegistry.locks
中redisLocks的缓存。有关更多信息,请参阅其JavaDocs。
从5.5.13版开始,RedisLockRegistry
公开了setRedisLockType(RedisLockType)
选项,以确定应以哪种模式获取Redis锁。
-
RedisLockType.SPIN_LOCK
- 通过周期性循环(100毫秒)获取锁,检查是否可以获取锁。默认值。 -
RedisLockType.PUB_SUB_LOCK
- 通过redis发布-订阅订阅获取锁。
发布-订阅是首选模式 - 客户端Redis服务器之间的网络通信量更少,并且性能更高 - 在其他进程中通知解锁时,订阅立即获取锁。但是,Redis不支持主/从连接中的发布-订阅(例如在AWS ElastiCache环境中),因此选择繁忙旋转模式作为默认模式,以使注册表在任何环境中都能工作。