TCP 消息关联

IP 端点的目标之一是提供与 Spring Integration 应用程序以外的系统通信。为此,默认情况下只发送和接收消息有效负载。从 3.0 开始,您可以使用 JSON、Java 序列化或自定义序列化器和反序列化器来传输标头。有关更多信息,请参见 传输标头。框架(除了使用网关时)或服务器端的协作通道适配器不提供消息关联。 本文档后面的部分 将讨论可用于应用程序的各种关联技术。在大多数情况下,这需要特定于应用程序级别的消息关联,即使消息有效负载包含一些自然关联数据(例如订单号)。

网关

网关会自动关联消息。但是,您应该将出站网关用于相对低流量的应用程序。当您将连接工厂配置为对所有消息对使用单个共享连接('single-use="false"')时,一次只能处理一条消息。新消息必须等到收到先前消息的回复才能处理。当连接工厂配置为对每条新消息使用新连接('single-use="true"')时,此限制不适用。虽然此设置可以比共享连接环境提供更高的吞吐量,但它会带来为每对消息打开和关闭新连接的开销。

因此,对于高容量消息,请考虑使用一对协作的通道适配器。但是,要这样做,您需要提供协作逻辑。

另一个解决方案是在 Spring Integration 2.2 中引入的,即使用 CachingClientConnectionFactory,它允许使用共享连接池。

协作的出站和入站通道适配器

为了实现高吞吐量(避免使用网关的陷阱,如 前面所述),您可以配置一对协作的出站和入站通道适配器。您还可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是使用请求-回复语义)。在服务器端,消息关联由适配器自动处理,因为入站适配器会添加一个标头,允许出站适配器在发送回复消息时确定要使用哪个连接。

在服务器端,您必须填充 ip_connectionId 标头,因为它用于将消息与连接关联起来。源自入站适配器的消息会自动设置标头。如果您希望构建其他要发送的消息,则需要设置标头。您可以从传入消息中获取标头值。

在客户端,应用程序必须提供自己的关联逻辑(如果需要)。您可以通过多种方式做到这一点。

如果消息有效负载包含一些自然的关联数据(例如事务 ID 或订单号),并且您不需要保留来自原始出站消息的任何信息(例如回复通道标头),则关联很简单,并且无论如何都将在应用程序级别完成。

如果消息有效负载包含一些自然的关联数据(例如事务 ID 或订单号),但您需要保留来自原始出站消息的一些信息(例如回复通道标头),您可以保留原始出站消息的副本(也许通过使用发布-订阅通道),并使用聚合器重新组合必要的数据。

对于前两种情况中的任何一种,如果有效负载没有自然的关联数据,您可以在出站通道适配器上游提供一个转换器,以使用此类数据增强有效负载。这样的转换器可能会将原始有效负载转换为包含原始有效负载和消息标头子集的新对象。当然,标头中的活动对象(例如回复通道)不能包含在转换后的有效负载中。

如果您选择这种策略,您需要确保连接工厂具有适当的序列化器-反序列化器对来处理此类有效负载(例如 DefaultSerializerDefaultDeserializer,它们使用 Java 序列化,或自定义序列化器和反序列化器)。TCP 连接工厂中提到的 ByteArray*Serializer 选项(包括默认的 ByteArrayCrLfSerializer)不支持此类有效负载,除非转换后的有效负载是 Stringbyte[]

在 2.2 版本之前,当协作通道适配器使用客户端连接工厂时,so-timeout 属性默认设置为默认回复超时时间(10 秒)。这意味着,如果入站适配器在此期间内未收到任何数据,则套接字将关闭。

这种默认行为在真正异步的环境中并不合适,因此现在默认设置为无限超时。您可以通过将客户端连接工厂上的 so-timeout 属性设置为 10000 毫秒来恢复之前的默认行为。

从 5.4 版本开始,多个出站通道适配器和一个 TcpInboundChannelAdapter 可以共享同一个连接工厂。这允许应用程序同时支持请求/回复和任意服务器→客户端消息传递。有关更多信息,请参见 TCP 网关

传输标头

TCP 是一种流协议。序列化器反序列化器 在流中划分消息。在 3.0 之前,只有消息有效负载(Stringbyte[])可以通过 TCP 传输。从 3.0 开始,您还可以传输选定的标头以及有效负载。但是,无法序列化“实时”对象,例如 replyChannel 标头。

通过 TCP 发送标头信息需要一些额外的配置。

第一步是为 ConnectionFactory 提供一个使用 mapper 属性的 MessageConvertingTcpMessageMapper。此映射器委托给任何 MessageConverter 实现,将消息转换为可由配置的 序列化器反序列化器 序列化和反序列化的对象。

Spring Integration 提供了一个 MapMessageConverter,它允许指定要添加到 Map 对象中的标头列表以及有效负载。生成的 Map 包含两个条目:payloadheadersheaders 条目本身是一个 Map,包含选定的标头。

第二步是提供一个序列化器和一个反序列化器,它们可以将 Map 与某种线格式进行转换。这可以是自定义的 序列化器反序列化器,如果您需要与非 Spring Integration 应用程序的同级系统进行通信,则通常需要使用它们。

Spring Integration 提供了一个 `MapJsonSerializer` 用于将 `Map` 对象转换为 JSON 格式,反之亦然。它使用 Spring Integration 的 `JsonObjectMapper`。如果需要,您可以提供自定义的 `JsonObjectMapper`。默认情况下,序列化器会在对象之间插入一个换行符 ( `0x0a` ) 字符。有关更多信息,请参阅 Javadoc

`JsonObjectMapper` 使用类路径中可用的 `Jackson` 版本。

您也可以使用 `Map` 的标准 Java 序列化,方法是使用 `DefaultSerializer` 和 `DefaultDeserializer`。

以下示例展示了连接工厂的配置,该工厂使用 JSON 传输 `correlationId`、`sequenceNumber` 和 `sequenceSize` 头信息。

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

使用上述配置发送的消息,其有效负载为 'something',将在网络中显示如下。

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}