消息

Spring Integration 的 Message 是一个通用的数据容器。任何对象都可以作为有效负载提供,每个 Message 实例都包含包含用户可扩展属性(作为键值对)的标头。

Message 接口

以下清单显示了 Message 接口的定义

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message 接口是 API 的核心部分。通过将数据封装在通用包装器中,消息系统可以在不了解数据类型的情况下传递数据。随着应用程序发展以支持新类型,或者当类型本身被修改或扩展时,消息系统不会受到影响。另一方面,当消息系统中的某些组件确实需要访问有关 Message 的信息时,此类元数据通常可以存储到消息标头中的元数据中并从中检索。

消息标头

正如 Spring Integration 允许任何 Object 用作 Message 的有效负载一样,它还支持任何 Object 类型作为标头值。实际上,MessageHeaders 类实现了 java.util.Map_ 接口,如下面的类定义所示

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
即使 MessageHeaders 类实现了 Map,它实际上是一个只读实现。任何尝试在 Map 中 put 值的操作都会导致 UnsupportedOperationExceptionremoveclear 也是如此。由于消息可能会传递给多个使用者,因此 Map 的结构不能被修改。同样,消息的有效负载 Object 在初始创建后不能被 set。但是,标头值本身(或有效负载 Object)的可变性有意地留给框架用户决定。

作为 Map 的实现,可以通过使用标头的名称调用 get(..) 来检索标头。或者,您可以提供预期的 Class 作为附加参数。更好的是,在检索预定义值之一时,可以使用方便的 getter。以下示例显示了这三种选项中的每一种

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

下表描述了预定义的消息头

表 1. 预定义的消息头
头名称 头类型 用途
 MessageHeaders.ID
 java.util.UUID

此消息实例的标识符。每次消息被修改时都会更改。

 MessageHeaders.
TIMESTAMP
 java.lang.Long

消息创建的时间。每次消息被修改时都会更改。

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

当没有显式配置输出通道且没有 ROUTING_SLIPROUTING_SLIP 已用尽时,回复(如果有)发送到的通道。如果该值为 String,则它必须表示一个 Bean 名称或由 ChannelRegistry 生成。

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

发送错误的通道。如果该值为 String,则它必须表示一个 Bean 名称或由 ChannelRegistry 生成。

许多入站和出站适配器实现也提供或期望某些头,并且您可以配置其他用户定义的头。这些头的常量可以在存在此类头的模块中找到,例如 AmqpHeadersJmsHeaders 等。

MessageHeaderAccessor API

从 Spring Framework 4.0 和 Spring Integration 4.0 开始,核心消息传递抽象已移至 spring-messaging 模块,并且引入了 MessageHeaderAccessor API 来提供对消息传递实现的额外抽象。所有(核心)Spring Integration 特定的消息头常量现在在 IntegrationMessageHeaderAccessor 类中声明。下表描述了预定义的消息头

表 2. 预定义的消息头
头名称 头类型 用途
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

用于关联两个或多个消息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常是具有 SEQUENCE_SIZE 的一组消息的序列号,但也可以在 <resequencer/> 中使用以重新排序一组无界的消息。

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

一组相关消息中的消息数量。

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

指示消息何时过期。框架不会直接使用它,但可以使用头富集器设置它,并在配置了 UnexpiredMessageSelector<filter/> 中使用。

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

消息优先级,例如在 PriorityChannel 中。

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

如果消息被幂等接收器拦截器检测为重复,则为 True。请参阅 幂等接收器企业集成模式

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

如果消息与应在消息处理完成后关闭的 Closeable 相关联,则存在此标头。例如,使用 FTP、SFTP 等进行流式文件传输时关联的 Session

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

如果消息驱动通道适配器支持 RetryTemplate 的配置,则此标头包含当前的传递尝试次数。

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

如果入站端点支持,则回调以接受、拒绝或重新排队消息。请参阅 延迟确认可轮询消息源MQTT 手动确认

IntegrationMessageHeaderAccessor 类提供了对其中一些标头的便捷类型化 getter,如下例所示

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

下表描述了也出现在 IntegrationMessageHeaderAccessor 中但通常不为用户代码使用(即,它们通常由 Spring Integration 的内部部分使用 - 它们在此处的包含是为了完整性)的标头

表 3. 预定义消息标头
头名称 头类型 用途
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

当需要嵌套关联时使用的关联数据堆栈(例如,splitter→…​→splitter→…​→aggregator→…​→aggregator)。

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

请参阅 路由单

消息 ID 生成

当消息在应用程序中转换时,每次它被修改(例如,通过转换器)时,都会分配一个新的消息 ID。消息 ID 是一个 UUID。从 Spring Integration 3.0 开始,用于 IS 生成的默认策略比以前的 java.util.UUID.randomUUID() 实现更有效。它使用基于安全随机种子的简单随机数,而不是每次都创建安全随机数。

可以通过在应用程序上下文中声明实现 org.springframework.util.IdGenerator 的 bean 来选择不同的 UUID 生成策略。

一个类加载器中只能使用一种 UUID 生成策略。这意味着,如果两个或多个应用程序上下文在同一个类加载器中运行,它们共享相同的策略。如果其中一个上下文更改了策略,则所有上下文都使用该策略。如果同一个类加载器中的两个或多个上下文声明了类型为 org.springframework.util.IdGenerator 的 bean,则它们必须都是同一个类的实例。否则,尝试替换自定义策略的上下文将无法初始化。如果策略相同,但参数化,则使用第一个初始化的上下文的策略。

除了默认策略之外,还提供了两个额外的 IdGeneratorsorg.springframework.util.JdkIdGenerator 使用以前的 UUID.randomUUID() 机制。当不需要 UUID 并且简单的递增值就足够时,可以使用 o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator

只读标头

MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读头,不能被覆盖。

从 4.3.2 版本开始,MessageBuilder 提供了 readOnlyHeaders(String…​ readOnlyHeaders) API 来自定义不应该从上游 Message 中复制的头的列表。默认情况下,只有 MessageHeaders.IDMessageHeaders.TIMESTAMP 是只读的。全局 spring.integration.readOnly.headers 属性(参见 全局属性)用于自定义框架组件的 DefaultMessageBuilderFactory。当您不想填充一些开箱即用的头时,这很有用,例如 ObjectToJsonTransformercontentType(参见 JSON 转换器)。

当您尝试使用 MessageBuilder 构建新消息时,这种类型的头会被忽略,并且会向日志发出特定的 INFO 消息。

从 5.0 版本开始,消息网关头富集器负载富集器头过滤器 不允许您在使用 DefaultMessageBuilderFactory 时配置 MessageHeaders.IDMessageHeaders.TIMESTAMP 头名称,并且它们会抛出 BeanInitializationException

头传播

当消息被消息生产端点(例如 服务激活器)处理(和修改)时,通常情况下,入站头会被传播到出站消息。一个例外是 转换器,当完整的消息被返回到框架时。在这种情况下,用户代码负责整个出站消息。当转换器只返回负载时,入站头会被传播。此外,只有当头在出站消息中不存在时才会被传播,允许您根据需要更改头值。

从 4.3.10 版本开始,您可以配置消息处理程序(修改消息并生成输出)以抑制特定头的传播。要配置您不想复制的头,请在 MessageProducingMessageHandler 抽象类上调用 setNotPropagatedHeaders()addNotPropagatedHeaders() 方法。

您还可以通过在 META-INF/spring.integration.properties 中将 readOnlyHeaders 属性设置为逗号分隔的头列表来全局抑制特定消息头的传播。

从 5.0 版本开始,AbstractMessageProducingHandler 上的 setNotPropagatedHeaders() 实现使用简单的模式(xxx*xxx*xxxxxx*yyy)来允许过滤具有共同后缀或前缀的报头。有关更多信息,请参阅 PatternMatchUtils Javadoc。当其中一个模式为 *(星号)时,不会传播任何报头。所有其他模式将被忽略。在这种情况下,服务激活器与转换器行为相同,并且必须在服务方法返回的 Message 中提供任何必需的报头。notPropagatedHeaders() 选项在 Java DSL 的 ConsumerEndpointSpec 中可用。它也可以在 <service-activator> 组件的 XML 配置中作为 not-propagated-headers 属性使用。

报头传播抑制不适用于那些不修改消息的端点,例如 桥接器路由器

消息实现

Message 接口的基本实现是 GenericMessage<T>,它提供了两个构造函数,如下所示

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

当创建 Message 时,会生成一个随机的唯一 ID。接受 Map 报头的构造函数会将提供的报头复制到新创建的 Message 中。

还有一个方便的 Message 实现,用于传达错误条件。此实现以 Throwable 对象作为其有效负载,如下例所示

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

请注意,此实现利用了 GenericMessage 基类是参数化的这一事实。因此,如两个示例所示,在检索 Message 有效负载 Object 时,无需进行强制转换。

MessageBuilder 辅助类

您可能会注意到,Message 接口定义了其有效负载和报头的检索方法,但没有提供任何设置器。原因是 Message 在其初始创建后无法修改。因此,当 Message 实例发送到多个消费者(例如,通过发布-订阅通道)时,如果其中一个消费者需要发送具有不同有效负载类型的回复,则它必须创建一个新的 Message。因此,其他消费者不会受到这些更改的影响。请记住,多个消费者可能会访问相同的有效负载实例或报头值,并且此类实例本身是否不可变取决于您。换句话说,Message 实例的契约类似于不可修改的 Collection 的契约,MessageHeaders 地图进一步说明了这一点。即使 MessageHeaders 类实现了 java.util.Map,任何尝试在 MessageHeaders 实例上调用 put 操作(或“remove”或“clear”)都会导致 UnsupportedOperationException

Spring Integration 提供了一种比创建和填充 Map 传递给 GenericMessage 构造函数更方便的方式来构建消息:`MessageBuilder`。`MessageBuilder` 提供了两种工厂方法,用于从现有 `Message` 或带有有效负载 `Object` 的 `Message` 创建 `Message` 实例。当从现有 `Message` 构建时,该 `Message` 的头和有效负载将被复制到新 `Message`,如下例所示。

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

如果您需要创建一个带有新有效负载的 `Message`,但仍然希望从现有 `Message` 复制头,可以使用其中一种“复制”方法,如下例所示。

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

请注意,`copyHeadersIfAbsent` 方法不会覆盖现有值。此外,在前面的示例中,您可以看到如何使用 `setHeader` 设置任何用户定义的头。最后,还有一些用于预定义头的 `set` 方法,以及用于设置任何头的非破坏性方法(`MessageHeaders` 也定义了预定义头名称的常量)。

您还可以使用 `MessageBuilder` 设置消息的优先级,如下例所示。

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

`priority` 头仅在使用 `PriorityChannel` 时才会被考虑(如下一章所述)。它被定义为 `java.lang.Integer`。