Hazelcast 支持
Spring Integration 提供通道适配器和其他实用组件,以与内存中数据网格 Hazelcast 交互。
您需要将此依赖项包含到您的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>6.3.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:6.3.0"
Hazelcast 组件的 XML 命名空间和 schemaLocation 定义为
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"
Hazelcast 事件驱动的入站通道适配器
Hazelcast 提供分布式数据结构,例如
-
com.hazelcast.map.IMap
-
com.hazelcast.multimap.MultiMap
-
com.hazelcast.collection.IList
-
com.hazelcast.collection.ISet
-
com.hazelcast.collection.IQueue
-
com.hazelcast.topic.ITopic
-
com.hazelcast.replicatedmap.ReplicatedMap
它还提供事件监听器,以便监听对这些数据结构所做的修改。
-
com.hazelcast.core.EntryListener<K, V>
-
com.hazelcast.collection.ItemListener
-
com.hazelcast.topic.MessageListener
Hazelcast 事件驱动的入站通道适配器监听相关的缓存事件,并将事件消息发送到定义的通道。它支持 XML 和 JavaConfig 驱动的配置。
XML 配置
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
Hazelcast 事件驱动的入站通道适配器需要以下属性
-
channel
: 指定消息发送到的通道; -
cache
: 指定监听的分布式对象引用。它是必填属性; -
cache-events
: 指定监听的缓存事件。它是可选属性,默认值为ADDED
。其支持的值如下 -
IMap
和MultiMap
支持的缓存事件类型:ADDED
、REMOVED
、UPDATED
、EVICTED
、EVICT_ALL
和CLEAR_ALL
; -
ReplicatedMap
支持的缓存事件类型:ADDED
、REMOVED
、UPDATED
、EVICTED
; -
IList
、ISet
和IQueue
支持的缓存事件类型:ADDED
、REMOVED
。ITopic
没有缓存事件类型。 -
cache-listening-policy
: 指定缓存监听策略为SINGLE
或ALL
。它是可选属性,默认值为SINGLE
。每个监听相同缓存对象且具有相同cache-events
属性的 Hazelcast 入站通道适配器,可以接收单个事件消息或所有事件消息。如果为ALL
,则所有监听相同缓存对象且具有相同cache-events
属性的 Hazelcast 入站通道适配器将接收所有事件消息。如果为SINGLE
,则它们将接收唯一的事件消息。
一些配置示例
<int:channel id="mapChannel"/>
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />
<bean id="map" factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />
<bean id="list" factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean id="set" factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />
<bean id="queue" factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean id="topic" factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>
Java 配置示例
以下示例展示了 DistributedMap
配置。相同的配置可用于其他分布式数据结构(IMap
、MultiMap
、ReplicatedMap
、IList
、ISet
、IQueue
和 ITopic
)
@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return producer;
}
Hazelcast 连续查询入站通道适配器
Hazelcast 连续查询允许监听对特定 Map 条目执行的修改。Hazelcast 连续查询入站通道适配器是一个事件驱动的通道适配器,它根据定义的谓词监听相关分布式 Map 事件。
-
Java
-
XML
@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);
return producer;
}
<int:channel id="cqMapChannel"/>
<int-hazelcast:cq-inbound-channel-adapter
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="cqMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它支持以下六个属性
-
channel
: 指定消息发送到的通道; -
cache
: 指定要监听的分布式 Map 引用。必填; -
cache-events
: 指定要监听的缓存事件。可选属性,默认值为ADDED
。支持的值为ADDED
、REMOVED
、UPDATED
、EVICTED
、EVICT_ALL
和CLEAR_ALL
; -
predicate
: 指定一个谓词,用于监听对特定映射条目执行的修改。必填; -
include-value
: 指定在连续查询结果中包含值和 oldValue。可选,默认值为true
; -
cache-listening-policy
: 指定缓存监听策略为SINGLE
或ALL
。可选,默认值为SINGLE
。每个监听相同缓存对象且具有相同 cache-events 属性的 Hazelcast CQ 入站通道适配器,可以接收单个事件消息或所有事件消息。如果为ALL
,则所有监听相同缓存对象且具有相同 cache-events 属性的 Hazelcast CQ 入站通道适配器,将接收所有事件消息。如果为SINGLE
,则它们将接收唯一的事件消息。
Hazelcast 集群监控器入站通道适配器
Hazelcast 集群监控器支持监听对集群执行的修改。Hazelcast 集群监控器入站通道适配器是一个事件驱动的通道适配器,它监听相关的成员资格、分布式对象、迁移、生命周期和客户端事件
-
Java
-
XML
@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
return producer;
}
<int:channel id="monitorChannel"/>
<int-hazelcast:cm-inbound-channel-adapter
channel="monitorChannel"
hazelcast-instance="instance"
monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它支持以下三个属性
-
channel
: 指定消息发送到的通道; -
hazelcast-instance
: 指定要监听集群事件的 Hazelcast 实例引用。这是一个必填属性; -
monitor-types
: 指定要监听的监控类型。这是一个可选属性,默认值为MEMBERSHIP
。支持的值为MEMBERSHIP
、DISTRIBUTED_OBJECT
、MIGRATION
、LIFECYCLE
、CLIENT
。
Hazelcast 分布式 SQL 入站通道适配器
Hazelcast 允许在分布式映射上运行分布式查询。Hazelcast 分布式 SQL 入站通道适配器是一个轮询入站通道适配器。它运行定义的分布式 SQL 命令,并根据迭代类型返回结果。
-
Java
-
XML
@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
return messageSource;
}
<int:channel id="dsMapChannel"/>
<int-hazelcast:ds-inbound-channel-adapter
channel="dsMapChannel"
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="dsMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它需要一个轮询器,并支持四个属性
-
channel
: 指定发送消息的通道。这是一个必填属性; -
cache
: 指定要查询的分布式IMap
引用。这是一个必填属性; -
iteration-type
: 指定结果类型。分布式 SQL 可以运行在EntrySet
、KeySet
、LocalKeySet
或Values
上。这是一个可选属性,默认值为VALUE
。支持的值为ENTRY
、KEY
、LOCAL_KEY
和VALUE
; -
distributed-sql
: 指定 SQL 语句的 where 子句。这是一个必填属性。
Hazelcast 出站通道适配器
Hazelcast 出站通道适配器监听其定义的通道,并将传入的消息写入相关的分布式缓存。它期望 cache
、cache-expression
或 HazelcastHeaders.CACHE_NAME
之一用于分布式对象定义。支持的分布式对象为:IMap
、MultiMap
、ReplicatedMap
、IList
、ISet
、IQueue
和 ITopic
。
-
Java
-
XML
@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
cache-expression="headers['CACHE_HEADER']"
key-expression="payload.key"
extract-payload="true"/>
它需要以下属性
-
channel
: 指定消息发送到的通道; -
cache
: 指定分布式对象引用。可选; -
cache-expression
: 通过 Spring 表达式语言 (SpEL) 指定分布式对象。可选; -
key-expression
: 通过 Spring 表达式语言 (SpEL) 指定键值对的键。可选,仅对IMap
、MultiMap
和ReplicatedMap
分布式数据结构必填。 -
extract-payload
: 指定发送整个消息还是仅发送有效负载。可选属性,默认值为true
。如果为 true,则仅将有效负载写入分布式对象。否则,将通过转换消息头和有效负载来写入整个消息。
通过在头文件中设置分布式对象名称,可以将消息写入相同通道的不同分布式对象。如果未定义 cache
或 cache-expression
属性,则必须在请求 Message
中设置 HazelcastHeaders.CACHE_NAME
头文件。
Hazelcast 领导者选举
如果需要领导者选举(例如,对于仅一个节点应接收消息的高可用消息消费者),可以使用基于 Hazelcast 的 LeaderInitiator
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LeaderInitiator initiator() {
return new LeaderInitiator(hazelcastInstance());
}
当某个节点被选为领导者时,它将向所有应用程序监听器发送 OnGrantedEvent
。
Hazelcast 消息存储
对于分布式消息传递状态管理,例如持久 QueueChannel
或跟踪 Aggregator
消息组,提供了 HazelcastMessageStore
实现
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
}
默认情况下,SPRING_INTEGRATION_MESSAGE_STORE
IMap
用于将消息和组存储为键/值。任何自定义 IMap
都可以提供给 HazelcastMessageStore
。
Hazelcast 元数据存储
使用支持 Hazelcast IMap
的 ListenableMetadataStore
实现可用。默认映射使用名称 SPRING_INTEGRATION_METADATA_STORE
创建,可以自定义。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}
HazelcastMetadataStore
实现 ListenableMetadataStore
,它允许您注册自己的 MetadataStoreListener
类型监听器,以通过 addListener(MetadataStoreListener callback)
监听事件。
Hazelcast 锁注册表
使用支持 Hazelcast 分布式 ILock
的 LockRegistry
实现可用
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LockRegistry lockRegistry() {
return new HazelcastLockRegistry(hazelcastInstance());
}
当与共享 MessageGroupStore
(例如 Aggregator
存储管理)一起使用时,HazelcastLockRegistry
可用于在多个应用程序实例之间提供此功能,以便一次只有一个实例可以操作该组。
对于所有分布式操作,必须在 HazelcastInstance 上启用 CP 子系统。
|
使用 Hazelcast 的消息通道
Hazelcast IQueue
和 ITopic
分布式对象本质上是消息传递原语,可以在没有此 Hazelcast 模块中额外实现的情况下与 Spring Integration 核心组件一起使用。
可以由任何 java.util.Queue
提供 QueueChannel
,包括提到的 Hazelcast 分布式 IQueue
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
return new QueueChannel(hazelcastInstance.Message<?>>getQueue("springIntegrationQueue"));
}
将此配置放在应用程序的 Hazelcast 集群中的多个节点上,将使 QueueChannel
成为分布式,并且只有一个节点能够从该 IQueue
中轮询单个 Message
。这与 PollableJmsChannel
、PollableKafkaChannel
或 PollableAmqpChannel
的工作方式类似。
如果生产者端不是 Spring Integration 应用程序,则无法配置 QueueChannel
,因此使用普通的 Hazelcast IQueue
API 来生产数据。在这种情况下,在消费者端使用 QueueChannel
方法是错误的:必须使用 入站通道适配器 解决方案。
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("springIntegrationQueue");
}
@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
return myStringHzQueue::poll;
}
Hazelcast 中的 ITopic
抽象与 JMS 中的 Topic
具有类似的语义:所有订阅者都会收到发布的消息。使用一对简单的 MessageChannel
bean,此机制作为开箱即用的功能得到支持。
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
MessageChannel fromHazelcastTopicChannel) {
ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
return topic;
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
return new FixedSubscriberChannel(springIntegrationTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}
FixedSubscriberChannel
是 DirectChannel
的优化变体,它需要在初始化时使用 MessageHandler
。由于 MessageHandler
是一个函数式接口,因此可以为 handleMessage
方法提供一个简单的 lambda 表达式。当消息发送到 publishToHazelcastTopicChannel
时,它只是发布到 Hazelcast ITopic
上。com.hazelcast.topic.MessageListener
也是一个函数式接口,因此可以为 ITopic#addMessageListener
提供一个 lambda 表达式。因此,fromHazelcastTopicChannel
的订阅者将使用发送到上述 ITopic
的所有消息。
可以使用 IExecutorService
为 ExecutorChannel
提供配置。例如,通过相应的配置,可以实现集群范围的单例。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(
new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}