消息转换器

AmqpTemplate 还定义了几个用于发送和接收消息的方法,这些方法委托给 MessageConverterMessageConverter 为每个方向提供一个方法:一个用于转换为 Message,另一个用于从 Message 转换。请注意,在转换为 Message 时,除了对象之外,还可以提供属性。object 参数通常对应于消息体。以下清单显示了 MessageConverter 接口定义

public interface MessageConverter {

    Message toMessage(Object object, MessageProperties messageProperties)
            throws MessageConversionException;

    Object fromMessage(Message message) throws MessageConversionException;

}

AmqpTemplate 上相关的 Message 发送方法比我们之前讨论的方法更简单,因为它们不需要 Message 实例。相反,MessageConverter 负责通过将提供的对象转换为 Message 主体的字节数组,然后添加任何提供的 MessageProperties 来“创建”每个 Message。以下清单显示了各种方法的定义

void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)
    throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
    throws AmqpException;

void convertAndSend(String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

在接收端,只有两种方法:一种接受队列名称,另一种依赖于模板的“队列”属性已设置。以下清单显示了两种方法的定义

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;
异步消费者 中提到的 MessageListenerAdapter 也使用 MessageConverter

SimpleMessageConverter

MessageConverter 策略的默认实现称为 SimpleMessageConverter。 这是 RabbitTemplate 实例在您没有显式配置替代方案时使用的转换器。 它处理基于文本的内容、序列化 Java 对象和字节数组。

Message 转换

如果输入 Message 的内容类型以 "text" 开头(例如,"text/plain"),它还会检查 content-encoding 属性以确定在将 Message 主体字节数组转换为 Java String 时要使用的字符集。 如果输入 Message 上没有设置 content-encoding 属性,它默认使用 UTF-8 字符集。 如果您需要覆盖该默认设置,您可以配置 SimpleMessageConverter 的实例,设置其 defaultCharset 属性,并将该实例注入到 RabbitTemplate 实例中。

如果输入 Message 的 content-type 属性值设置为 "application/x-java-serialized-object",SimpleMessageConverter 会尝试将字节数组反序列化(重新水化)为 Java 对象。 虽然这可能对简单的原型设计有用,但我们不建议依赖 Java 序列化,因为它会导致生产者和消费者之间紧密耦合。 当然,它也排除了在任何一方使用非 Java 系统。 由于 AMQP 是一种线级协议,如果由于这种限制而失去这种优势的很大一部分,将是令人遗憾的。 在接下来的两节中,我们将探讨一些在不依赖 Java 序列化的情况下传递丰富域对象内容的替代方案。

对于所有其他内容类型,SimpleMessageConverter 直接将 Message 主体内容作为字节数组返回。

有关重要信息,请参阅 Java 反序列化

转换为 Message

当从任意 Java 对象转换为 Message 时,SimpleMessageConverter 同样处理字节数组、字符串和可序列化实例。 它将这些中的每一个转换为字节(对于字节数组,没有要转换的内容),并相应地设置 content-type 属性。 如果要转换的 Object 与这些类型中的任何一个都不匹配,则 Message 主体为 null。

SerializerMessageConverter

此转换器类似于 SimpleMessageConverter,但它可以配置其他 Spring Framework SerializerDeserializer 实现,用于 application/x-java-serialized-object 转换。

有关重要信息,请参阅 Java 反序列化

Jackson2JsonMessageConverter

本节介绍如何使用 Jackson2JsonMessageConverterMessage 之间进行转换。它包含以下部分:

转换为 Message

如上一节所述,依赖 Java 序列化通常不推荐。一种更灵活、跨语言和平台可移植的常见替代方案是 JSON(JavaScript 对象表示法)。可以在任何 RabbitTemplate 实例上配置转换器,以覆盖其对 SimpleMessageConverter 默认值的用法。Jackson2JsonMessageConverter 使用 com.fasterxml.jackson 2.x 库。以下示例配置了 Jackson2JsonMessageConverter

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
        <bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
            <!-- if necessary, override the DefaultClassMapper -->
            <property name="classMapper" ref="customClassMapper"/>
        </bean>
    </property>
</bean>

如上所示,Jackson2JsonMessageConverter 默认使用 DefaultClassMapper。类型信息被添加到(并从)MessageProperties 中检索。如果入站消息在 MessageProperties 中不包含类型信息,但您知道预期类型,则可以使用 defaultType 属性配置静态类型,如下例所示

<bean id="jsonConverterWithDefaultType"
      class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
    <property name="classMapper">
        <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
            <property name="defaultType" value="thing1.PurchaseOrder"/>
        </bean>
    </property>
</bean>

此外,您可以提供从 TypeId 标头中的值到类型的自定义映射。以下示例展示了如何做到这一点

@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
    Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
    jsonConverter.setClassMapper(classMapper());
    return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
    DefaultClassMapper classMapper = new DefaultClassMapper();
    Map<String, Class<?>> idClassMapping = new HashMap<>();
    idClassMapping.put("thing1", Thing1.class);
    idClassMapping.put("thing2", Thing2.class);
    classMapper.setIdClassMapping(idClassMapping);
    return classMapper;
}

现在,如果发送系统将标头设置为 thing1,转换器将创建一个 Thing1 对象,依此类推。有关从非 Spring 应用程序转换消息的完整讨论,请参阅 接收来自非 Spring 应用程序的 JSON 示例应用程序。

从版本 2.4.3 开始,如果 supportedMediaType 具有 charset 参数,转换器将不会添加 contentEncoding 消息属性;这也用于编码。添加了一个新的方法 setSupportedMediaType

String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

Message 转换

入站消息根据发送系统添加到标头中的类型信息转换为对象。

从版本 2.4.3 开始,如果没有 contentEncoding 消息属性,转换器将尝试在 contentType 消息属性中检测 charset 参数并使用它。如果两者都不存在,如果 supportedMediaType 具有 charset 参数,它将用于解码,最终回退到 defaultCharset 属性。添加了一个新的方法 setSupportedMediaType

String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

在 1.6 之前的版本中,如果类型信息不存在,转换将失败。从版本 1.6 开始,如果类型信息丢失,转换器将使用 Jackson 默认值(通常是映射)转换 JSON。

此外,从版本 1.6 开始,当您使用 @RabbitListener 注解(在方法上)时,推断的类型信息将添加到 MessageProperties 中。这使转换器能够转换为目标方法的参数类型。这仅适用于只有一个参数且没有注解或只有一个参数且带有 @Payload 注解的情况。类型为 Message 的参数在分析过程中会被忽略。

默认情况下,推断的类型信息将覆盖发送系统创建的传入的TypeId和相关头信息。这使得接收系统可以自动转换为不同的域对象。这仅适用于参数类型是具体的(不是抽象的或接口)或来自java.util包。在所有其他情况下,将使用TypeId和相关头信息。在某些情况下,您可能希望覆盖默认行为并始终使用TypeId信息。例如,假设您有一个@RabbitListener,它接受一个Thing1参数,但消息包含一个Thing2,它是Thing1的子类(它是具体的)。推断的类型将不正确。为了处理这种情况,将Jackson2JsonMessageConverter上的TypePrecedence属性设置为TYPE_ID,而不是默认的INFERRED。(该属性实际上位于转换器的DefaultJackson2JavaTypeMapper上,但为了方便起见,在转换器上提供了一个setter。)如果您注入自定义类型映射器,则应在映射器上设置该属性。
Message转换时,传入的MessageProperties.getContentType()必须符合 JSON 规范(使用contentType.contains("json")进行检查)。从版本 2.2 开始,如果不存在contentType属性,或者它具有默认值application/octet-stream,则假定为application/json。要恢复到之前的行为(返回未转换的byte[]),请将转换器的assumeSupportedContentType属性设置为false。如果内容类型不受支持,则会发出一个WARN日志消息Could not convert incoming message with content-type […​],并且message.getBody()将按原样返回——作为byte[]。因此,为了满足消费者端Jackson2JsonMessageConverter的要求,生产者必须添加contentType消息属性——例如,作为application/jsontext/x-json,或者通过使用Jackson2JsonMessageConverter,它会自动设置头信息。以下列表显示了一些转换器调用
@RabbitListener
public void thing1(Thing1 thing1) {...}

@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}

@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}

在前面的列表中的前四种情况下,转换器尝试转换为Thing1类型。第五个示例无效,因为我们无法确定哪个参数应该接收消息有效负载。在第六个示例中,由于泛型类型是WildcardType,因此应用了 Jackson 默认值。

但是,您可以创建自定义转换器并使用targetMethod消息属性来决定将 JSON 转换为哪种类型。

只有在方法级别声明@RabbitListener注释时才能实现这种类型推断。使用类级别的@RabbitListener,转换后的类型用于选择要调用的哪个@RabbitHandler方法。出于这个原因,基础设施提供了targetObject消息属性,您可以在自定义转换器中使用它来确定类型。
从版本 1.6.11 开始,Jackson2JsonMessageConverter以及DefaultJackson2JavaTypeMapperDefaultClassMapper)提供了trustedPackages选项来克服序列化 Gadget漏洞。默认情况下,为了向后兼容,Jackson2JsonMessageConverter信任所有包——也就是说,它对该选项使用*

从 2.4.7 版本开始,转换器可以配置为在 Jackson 反序列化消息体后返回 null 时返回 Optional.empty()。这通过两种方式方便 @RabbitListener 接收空有效负载。

@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
    handleOptional(payload); // payload might be null
}

@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
    handleOptional(optional.orElse(this.emptyThing));
}

要启用此功能,请将 setNullAsOptionalEmpty 设置为 true;当为 false(默认)时,转换器将回退到原始消息体 (byte[])。

@Bean
Jackson2JsonMessageConverter converter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setNullAsOptionalEmpty(true);
    return converter;
}

反序列化抽象类

在 2.2.8 版本之前,如果 @RabbitListener 的推断类型是抽象类(包括接口),转换器将回退到查找标头中的类型信息,如果存在,则使用该信息;如果不存在,它将尝试创建抽象类。当使用配置了自定义反序列化器来处理抽象类的自定义 ObjectMapper 时,但传入消息具有无效类型标头,这会导致问题。

从 2.2.8 版本开始,默认情况下保留以前的行为。如果您有这样的自定义 ObjectMapper 并且您想忽略类型标头,并且始终使用推断类型进行转换,请将 alwaysConvertToInferredType 设置为 true。这对于向后兼容性以及在转换失败时(使用标准 ObjectMapper)避免尝试转换的开销是必需的。

使用 Spring Data 投影接口

从 2.2 版本开始,您可以将 JSON 转换为 Spring Data 投影接口,而不是具体类型。这允许对数据进行非常有选择性和低耦合的绑定,包括从 JSON 文档中的多个位置查找值。例如,以下接口可以定义为消息有效负载类型

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

访问器方法将默认用于在接收到的 JSON 文档中查找属性名称作为字段。@JsonPath 表达式允许自定义值查找,甚至定义多个 JSON 路径表达式,以从多个位置查找值,直到表达式返回实际值。

要启用此功能,请在消息转换器上将 useProjectionForInterfaces 设置为 true。您还必须将 spring-data:spring-data-commonscom.jayway.jsonpath:json-path 添加到类路径。

当用作 @RabbitListener 方法的参数时,接口类型会像往常一样自动传递给转换器。

从带有 RabbitTemplateMessage 中转换

如前所述,类型信息在消息头中传递,以在从消息转换时帮助转换器。这在大多数情况下都能正常工作。但是,当使用泛型时,它只能转换简单对象和已知的“容器”对象(列表、数组和映射)。从版本 2.0 开始,Jackson2JsonMessageConverter 实现 SmartMessageConverter,这使得它可以与新的 RabbitTemplate 方法一起使用,这些方法接受 ParameterizedTypeReference 参数。这允许转换复杂的泛型类型,如以下示例所示

Thing1<Thing2<Cat, Hat>> thing1 =
    rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });
从版本 2.1 开始,AbstractJsonMessageConverter 类已被删除。它不再是 Jackson2JsonMessageConverter 的基类。它已被 AbstractJackson2MessageConverter 替换。

MarshallingMessageConverter

另一个选择是 MarshallingMessageConverter。它委托给 Spring OXM 库中 MarshallerUnmarshaller 策略接口的实现。您可以阅读更多关于该库的信息 这里。在配置方面,最常见的是只提供构造函数参数,因为大多数 Marshaller 的实现也实现了 Unmarshaller。以下示例展示了如何配置 MarshallingMessageConverter

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
        <bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
            <constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
        </bean>
    </property>
</bean>

Jackson2XmlMessageConverter

此类是在版本 2.1 中引入的,可用于将消息从 XML 转换到 XML。

Jackson2XmlMessageConverterJackson2JsonMessageConverter 具有相同的基类:AbstractJackson2MessageConverter

AbstractJackson2MessageConverter 类被引入以替换一个已删除的类:AbstractJsonMessageConverter

Jackson2XmlMessageConverter 使用 com.fasterxml.jackson 2.x 库。

您可以像使用 Jackson2JsonMessageConverter 一样使用它,只是它支持 XML 而不是 JSON。以下示例配置了 Jackson2JsonMessageConverter

<bean id="xmlConverterWithDefaultType"
        class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
    <property name="classMapper">
        <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
            <property name="defaultType" value="foo.PurchaseOrder"/>
        </bean>
    </property>
</bean>

有关更多信息,请参阅 Jackson2JsonMessageConverter

从版本 2.2 开始,如果不存在 contentType 属性,或者它具有默认值 application/octet-stream,则假定为 application/xml。要恢复到之前的行为(返回未转换的 byte[]),请将转换器的 assumeSupportedContentType 属性设置为 false

ContentTypeDelegatingMessageConverter

此类是在版本 1.4.2 中引入的,它允许根据 MessageProperties 中的 content type 属性委托给特定的 MessageConverter。默认情况下,如果不存在 contentType 属性,或者存在与任何已配置的转换器都不匹配的值,它会委托给 SimpleMessageConverter。以下示例配置了 ContentTypeDelegatingMessageConverter

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json" value-ref="jsonMessageConverter" />
            <entry key="application/xml" value-ref="xmlMessageConverter" />
        </map>
    </property>
</bean>

Java 反序列化

本节介绍如何反序列化 Java 对象。

从不受信任的来源反序列化 Java 对象时,存在潜在的漏洞。

如果您接受来自不受信任来源的带有 content-typeapplication/x-java-serialized-object 的消息,您应该考虑配置允许反序列化的包和类。这适用于 SimpleMessageConverterSerializerMessageConverter,当它们被配置为使用 DefaultDeserializer(无论是隐式还是通过配置)时。

默认情况下,允许列表为空,这意味着不会反序列化任何类。

您可以设置模式列表,例如 thing1., thing1.thing2.Cat.MySafeClass

这些模式按顺序检查,直到找到匹配项。如果没有匹配项,则会抛出 SecurityException

您可以使用这些转换器上的 allowedListPatterns 属性设置模式。或者,如果您信任所有消息来源,您可以将环境变量 SPRING_AMQP_DESERIALIZATION_TRUST_ALL 或系统属性 spring.amqp.deserialization.trust.all 设置为 true

消息属性转换器

MessagePropertiesConverter 策略接口用于在 Rabbit Client BasicProperties 和 Spring AMQP MessageProperties 之间进行转换。默认实现(DefaultMessagePropertiesConverter)通常足以满足大多数目的,但您也可以根据需要实现自己的转换器。默认属性转换器将类型为 LongStringBasicProperties 元素转换为 String 实例,前提是大小不超过 1024 字节。更大的 LongString 实例不会被转换(参见下一段)。此限制可以通过构造函数参数覆盖。

从 1.6 版本开始,默认情况下,DefaultMessagePropertiesConverter 将长度超过长字符串限制(默认:1024)的标头保留为 LongString 实例。您可以通过 getBytes[]toString()getStream() 方法访问内容。

以前,DefaultMessagePropertiesConverter 将此类标头“转换为”DataInputStream(实际上它只是引用了 LongString 实例的 DataInputStream)。在输出时,此标头不会被转换(除了转换为字符串,例如,通过在流上调用 toString()java.io.DataInputStream@1d057a39)。

现在,大型传入 LongString 标头在输出时也会被正确地“转换”(默认情况下)。

提供了一个新的构造函数,让您可以配置转换器以按以前的方式工作。以下清单显示了该方法的 Javadoc 注释和声明

/**
 * Construct an instance where LongStrings will be returned
 * unconverted or as a java.io.DataInputStream when longer than this limit.
 * Use this constructor with 'true' to restore pre-1.6 behavior.
 * @param longStringLimit the limit.
 * @param convertLongLongStrings LongString when false,
 * DataInputStream when true.
 * @since 1.6
 */
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

从 1.6 版本开始,MessageProperties 中添加了一个名为 correlationIdString 的新属性。以前,在使用 RabbitMQ 客户端进行 BasicProperties 的转换时,会执行不必要的 byte[] <→ String 转换,因为 MessageProperties.correlationId 是一个 byte[],而 BasicProperties 使用的是 String。(最终,RabbitMQ 客户端使用 UTF-8 将 String 转换为字节,以放入协议消息中)。

为了提供最大的向后兼容性,在 DefaultMessagePropertiesConverter 中添加了一个名为 correlationIdPolicy 的新属性。它接受一个 DefaultMessagePropertiesConverter.CorrelationIdPolicy 枚举参数。默认情况下,它设置为 BYTES,这将复制之前的行为。

对于入站消息

  • STRING:仅映射 correlationIdString 属性

  • BYTES:仅映射 correlationId 属性

  • BOTH:映射两个属性

对于出站消息

  • STRING:仅映射 correlationIdString 属性

  • BYTES:仅映射 correlationId 属性

  • BOTH:考虑两个属性,String 属性优先

从 1.6 版本开始,入站 deliveryMode 属性不再映射到 MessageProperties.deliveryMode。它映射到 MessageProperties.receivedDeliveryMode。同样,入站 userId 属性不再映射到 MessageProperties.userId。它映射到 MessageProperties.receivedUserId。这些更改是为了避免在使用相同的 MessageProperties 对象进行出站消息时意外传播这些属性。

从 2.2 版本开始,DefaultMessagePropertiesConverter 使用 getName() 而不是 toString() 转换任何类型为 Class<?> 的自定义标头值;这避免了消费应用程序必须从 toString() 表示中解析类名。对于滚动升级,您可能需要更改您的消费者以理解两种格式,直到所有生产者都升级。