消息通道
除了具有EIP方法的IntegrationFlowBuilder
之外,Java DSL还提供了一个流畅的API来配置MessageChannel
实例。为此,提供了MessageChannels
构建器工厂。以下示例演示了如何使用它。
@Bean
public PriorityChannelSpec priorityChannel() {
return MessageChannels.priority(this.mongoDbChannelMessageStore, "priorityGroup")
.interceptor(wireTap());
}
相同的MessageChannels
构建器工厂可用于IntegrationFlowBuilder
中的channel()
EIP方法来连接端点,类似于在XML配置中连接input-channel
/output-channel
对。默认情况下,端点使用DirectChannel
实例进行连接,其bean名称基于以下模式:[IntegrationFlow.beanName].channel#[channelNameIndex]
。此规则也适用于通过内联MessageChannels
构建器工厂使用生成的未命名通道。但是,所有MessageChannels
方法都有一个了解channelId
的变体,您可以使用它来设置MessageChannel
实例的bean名称。MessageChannel
引用和beanName
可用作bean方法调用。以下示例显示了使用channel()
EIP方法的可能方法。
@Bean
public QueueChannelSpec queueChannel() {
return MessageChannels.queue();
}
@Bean
public PublishSubscribeChannelSpec<?> publishSubscribe() {
return MessageChannels.publishSubscribe();
}
@Bean
public IntegrationFlow channelFlow() {
return IntegrationFlow.from("input")
.fixedSubscriberChannel()
.channel("queueChannel")
.channel(publishSubscribe())
.channel(MessageChannels.executor("executorChannel", this.taskExecutor))
.channel("output")
.get();
}
-
from("input")
表示“查找并使用具有“input”ID的MessageChannel
,或创建一个”。 -
fixedSubscriberChannel()
生成一个FixedSubscriberChannel
实例,并使用channelFlow.channel#0
的名称注册它。 -
channel("queueChannel")
的工作方式相同,但使用现有的queueChannel
bean。 -
channel(publishSubscribe())
是bean方法引用。 -
channel(MessageChannels.executor("executorChannel", this.taskExecutor))
是IntegrationFlowBuilder
,它将IntegrationComponentSpec
公开给ExecutorChannel
,并将其注册为executorChannel
。 -
channel("output")
将DirectChannel
bean注册为其名称为output
,只要不存在具有此名称的bean。
注意:前面的IntegrationFlow
定义是有效的,并且所有通道都应用于具有BridgeHandler
实例的端点。
小心从不同的IntegrationFlow 实例使用相同的内联通道定义通过MessageChannels 工厂。即使DSL解析器将不存在的对象注册为bean,它也无法从不同的IntegrationFlow 容器确定相同的对象(MessageChannel )。以下示例是错误的。 |
@Bean
public IntegrationFlow startFlow() {
return IntegrationFlow.from("input")
.transform(...)
.channel(MessageChannels.queue("queueChannel"))
.get();
}
@Bean
public IntegrationFlow endFlow() {
return IntegrationFlow.from(MessageChannels.queue("queueChannel"))
.handle(...)
.get();
}
该错误示例的结果是以下异常。
Caused by: java.lang.IllegalStateException:
Could not register object [queueChannel] under bean name 'queueChannel':
there is already object [queueChannel] bound
at o.s.b.f.s.DefaultSingletonBeanRegistry.registerSingleton(DefaultSingletonBeanRegistry.java:129)
要使其正常工作,需要为该通道声明@Bean
,并从不同的IntegrationFlow
实例使用其bean方法。