注解支持
除了使用 XML 命名空间配置消息端点外,还可以使用注解。首先,Spring Integration 提供了类级别的@MessageEndpoint
作为原型注解,这意味着它本身使用 Spring 的@Component
注解进行注解,因此 Spring 的组件扫描会自动将其识别为 bean 定义。
更重要的是各种方法级别的注解。它们表示带注解的方法能够处理消息。下面的示例演示了类级别和方法级别的注解
@MessageEndpoint
public class FooService {
@ServiceActivator
public void processMessage(Message message) {
...
}
}
方法“处理”消息的确切含义取决于具体的注解。Spring Integration 中可用的注解包括:
-
@Aggregator
(参见 聚合器) -
@Filter
(参见 过滤器) -
@Router
(参见 路由器) -
@ServiceActivator
(参见 服务激活器) -
@Splitter
(参见 分割器) -
@Transformer
(参见 转换器) -
@InboundChannelAdapter
(参见 通道适配器) -
@BridgeFrom
(参见 使用 Java 配置配置桥接) -
@BridgeTo
(参见 使用 Java 配置配置桥接) -
@MessagingGateway
(参见 消息网关) -
@IntegrationComponentScan
(参见 配置和@EnableIntegration
)
如果将 XML 配置与注解结合使用,则不需要@MessageEndpoint 注解。如果要从<service-activator/> 元素的ref 属性配置 POJO 引用,则只需提供方法级别的注解。在这种情况下,即使<service-activator/> 元素上不存在方法级别属性,注解也能防止歧义。 |
在大多数情况下,带注解的处理程序方法不需要Message
类型作为其参数。相反,方法参数类型可以与消息的有效负载类型匹配,如下例所示
public class ThingService {
@ServiceActivator
public void bar(Thing thing) {
...
}
}
当方法参数应从MessageHeaders
中的值映射时,另一个选项是使用参数级别的@Header
注解。通常,使用 Spring Integration 注解进行注解的方法可以接受Message
本身、消息有效负载或标题值(使用@Header
)作为参数。实际上,该方法可以接受组合,如下例所示
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
...
}
}
还可以使用@Headers
注解将所有消息头作为Map
提供,如下例所示
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
注解的值也可以是 SpEL 表达式(例如,someHeader.toUpperCase() ),这在希望在注入标题值之前对其进行操作时非常有用。它还提供了一个可选的required 属性,该属性指定属性值是否必须在标题中可用。required 属性的默认值为true 。 |
对于其中一些注解,当消息处理方法返回非空值时,端点会尝试发送回复。这在两种配置选项(命名空间和注解)中都是一致的,因为使用了此类端点的输出通道(如果可用),并且REPLY_CHANNEL
消息头值用作后备。
端点上的输出通道和回复通道消息头的组合启用了一种管道方法,其中多个组件具有输出通道,而最终组件允许将回复消息转发到回复通道(如原始请求消息中指定)。换句话说,最终组件取决于原始发送者提供的信息,因此可以动态支持任意数量的客户端。这是一个 返回地址 模式的示例。 |
除了此处显示的示例外,这些注解还支持inputChannel
和outputChannel
属性,如下例所示
@Service
public class ThingService {
@ServiceActivator(inputChannel="input", outputChannel="output")
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
处理这些注解会创建与相应的 XML 组件相同的 bean——AbstractEndpoint
实例和MessageHandler
实例(或入站通道适配器的MessageSource
实例)。参见 @Bean
方法上的注解。bean 名字由以下模式生成:[componentName].[methodName].[decapitalizedAnnotationClassShortName]
。在前面的示例中,AbstractEndpoint
的 bean 名字是thingService.otherThing.serviceActivator
,而MessageHandler
(MessageSource
) bean 的名字则在后面添加了.handler
(.source
) 后缀。可以使用@EndpointId
注解以及这些消息传递注解来自定义此类名字。MessageHandler
实例 (MessageSource
实例) 也适合由 消息历史记录 进行跟踪。
从 4.0 版开始,所有消息传递注解都提供SmartLifecycle
选项 (autoStartup
和phase
),以允许在应用程序上下文初始化时控制端点生命周期。它们的默认值分别为true
和0
。要更改端点的状态(例如start()
或stop()
),可以使用BeanFactory
(或自动装配)获取对端点 bean 的引用并调用这些方法。或者,可以将命令消息发送到控制总线
(参见 控制总线)。为此,应使用前面段落中提到的beanName
。
在解析上述注解后自动创建的通道(当未配置特定通道 bean 时),以及相应的使用者端点,在上下文初始化结束时作为 bean 声明。这些 bean **可以**在其他服务中自动装配,但必须用
|
从 6.0 版开始,所有消息传递注解现在都是@Repeatable
的,因此可以在同一个服务方法上声明几种相同类型的注解,其含义是创建与重复的注解一样多的端点。
@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
return input.toUpperCase();
}
使用@Poller
注解
在 Spring Integration 4.0 之前,消息传递注解要求inputChannel
是对SubscribableChannel
的引用。对于PollableChannel
实例,需要一个<int:bridge/>
元素来配置<int:poller/>
并使组合端点成为PollingConsumer
。4.0 版引入了@Poller
注解,允许直接在消息传递注解上配置poller
属性,如下例所示
public class AnnotationService {
@Transformer(inputChannel = "input", outputChannel = "output",
poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
public String handle(String payload) {
...
}
}
@Poller
注解仅提供简单的PollerMetadata
选项。可以使用属性占位符配置@Poller
注解的属性 (maxMessagesPerPoll
、fixedDelay
、fixedRate
和cron
)。此外,从 5.1 版开始,还提供了PollingConsumer
的receiveTimeout
选项。如果需要提供更多轮询选项(例如,transaction
、advice-chain
、error-handler
等),则应将PollerMetadata
配置为通用 bean 并将其 bean 名字用作@Poller
的value
属性。在这种情况下,不允许使用其他属性(必须在PollerMetadata
bean 上指定)。请注意,如果inputChannel
是PollableChannel
并且未配置@Poller
,则使用默认PollerMetadata
(如果它存在于应用程序上下文中)。要使用@Configuration
注解声明默认轮询器,请使用类似于以下示例的代码
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
以下示例显示了如何使用默认轮询器
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
public String handle(String payload) {
...
}
}
以下示例显示了如何使用命名轮询器
@Bean
public PollerMetadata myPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(1000));
return pollerMetadata;
}
以下示例显示了一个使用默认轮询器的端点
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
poller = @Poller("myPoller"))
public String handle(String payload) {
...
}
}
从 4.3.3 版开始,@Poller
注解具有errorChannel
属性,可以更轻松地配置底层的MessagePublishingErrorHandler
。此属性的作用与<poller>
XML 组件中的error-channel
相同。有关详细信息,请参见 端点命名空间支持。
消息传递注解上的poller()
属性与reactive()
属性互斥。有关详细信息,请参见下一节。
使用@Reactive
注解
ReactiveStreamsConsumer
自 5.0 版以来就一直存在,但它仅在端点的输入通道是FluxMessageChannel
(或任何org.reactivestreams.Publisher
实现)时才适用。从 5.3 版开始,无论输入通道类型如何,只要目标消息处理程序是ReactiveMessageHandler
,框架也会创建它的实例。@Reactive
子注解(类似于上面提到的@Poller
)已从 5.5 版开始引入所有消息传递注解。它接受一个可选的Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
bean 引用,并且独立于输入通道类型和消息处理程序,将目标端点转换为ReactiveStreamsConsumer
实例。该函数来自Flux.transform()
运算符,用于对来自输入通道的反应式流源应用一些自定义 (publishOn()
、doOnNext()
、log()
、retry()
等)。
以下示例演示了如何独立于最终订阅者和生产者更改输入通道的发布线程到该DirectChannel
@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
return flux -> flux.publishOn(Schedulers.parallel());
}
@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
...
}
消息传递注解上的reactive()
属性与poller()
属性互斥。有关详细信息,请参见 使用@Poller
注解 和 反应式流支持。
使用@InboundChannelAdapter
注解
4.0 版本引入了@InboundChannelAdapter
方法级注解。它基于注解方法的MethodInvokingMessageSource
生成一个SourcePollingChannelAdapter
集成组件。此注解类似于<int:inbound-channel-adapter>
XML 组件,并具有相同的限制:方法不能有参数,返回值类型不能为void
。它有两个属性:value
(必需的MessageChannel
bean 名称)和poller
(一个可选的@Poller
注解,如前面所述)。如果您需要提供一些MessageHeaders
,请使用Message<?>
返回类型并使用MessageBuilder
来构建Message<?>
。使用MessageBuilder
可以配置MessageHeaders
。以下示例演示如何使用@InboundChannelAdapter
注解。
@InboundChannelAdapter("counterChannel")
public Integer count() {
return this.counter.incrementAndGet();
}
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
return "foo";
}
4.3 版本为value
注解属性引入了channel
别名,以提高源代码的可读性。此外,目标MessageChannel
bean 在第一次receive()
调用时通过提供的名称(由outputChannelName
选项设置)在SourcePollingChannelAdapter
中解析,而不是在初始化阶段。它允许“延迟绑定”逻辑:从使用者角度来看,目标MessageChannel
bean 的创建和注册时间比@InboundChannelAdapter
解析阶段稍晚。
第一个示例要求默认轮询器已在应用程序上下文中的其他地方声明。
使用@MessagingGateway
注解
使用@IntegrationComponentScan
注解
标准 Spring Framework @ComponentScan
注解不会扫描接口以查找原型@Component
注解。为了克服此限制并允许配置@MessagingGateway
(参见 @MessagingGateway
注解),我们引入了@IntegrationComponentScan
机制。此注解必须与@Configuration
注解一起放置,并进行自定义以定义其扫描选项,例如basePackages
和basePackageClasses
。在这种情况下,所有发现的用@MessagingGateway
注解的接口都将被解析并注册为GatewayProxyFactoryBean
实例。所有其他基于类的组件都由标准@ComponentScan
解析。