消息转换器

AmqpTemplate 还定义了几种用于发送和接收消息的方法,这些方法委托给 MessageConverterMessageConverter 为每个方向提供了一种方法:一种用于转换为 Message,另一种用于从 Message 转换。请注意,在转换为 Message 时,除了对象外,还可以提供属性。object 参数通常对应于消息正文。以下清单显示了 MessageConverter 接口定义

public interface MessageConverter {

    Message toMessage(Object object, MessageProperties messageProperties)
            throws MessageConversionException;

    Object fromMessage(Message message) throws MessageConversionException;

}

AmqpTemplate 上相关的 Message 发送方法比我们前面讨论的方法更简单,因为它们不需要 Message 实例。相反,MessageConverter 负责通过将提供的对象转换为 Message 正文的字节数组,然后添加任何提供的 MessageProperties 来“创建”每个 Message。以下清单显示了各种方法的定义

void convertAndSend(Object message) throws AmqpException;

void convertAndSend(String routingKey, Object message) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message)
    throws AmqpException;

void convertAndSend(Object message, MessagePostProcessor messagePostProcessor)
    throws AmqpException;

void convertAndSend(String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

void convertAndSend(String exchange, String routingKey, Object message,
    MessagePostProcessor messagePostProcessor) throws AmqpException;

在接收端,只有两种方法:一种接受队列名称,另一种依赖于已设置模板的“队列”属性。以下清单显示了这两种方法的定义

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;
异步消费者中提到的MessageListenerAdapter也使用MessageConverter

SimpleMessageConverter

MessageConverter 策略的默认实现称为 SimpleMessageConverter。如果您没有显式配置替代项,则 RabbitTemplate 实例将使用此转换器。它处理基于文本的内容、序列化 Java 对象和字节数组。

Message 转换

如果输入 Message 的内容类型以“text”开头(例如,“text/plain”),它还会检查内容编码属性以确定在将 Message 正文字节数组转换为 Java String 时要使用的字符集。如果在输入 Message 上未设置内容编码属性,则默认使用 UTF-8 字符集。如果您需要覆盖该默认设置,可以配置 SimpleMessageConverter 实例,设置其 defaultCharset 属性,并将其注入到 RabbitTemplate 实例中。

如果输入的Message的content-type属性值设置为“application/x-java-serialized-object”,则SimpleMessageConverter会尝试将字节数组反序列化(重新水化)为Java对象。虽然这对于简单的原型设计可能很有用,但我们不建议依赖Java序列化,因为它会导致生产者和消费者之间紧密耦合。当然,这也排除了在任何一方使用非Java系统。由于AMQP是一个线级协议,因此在这种限制下失去这种优势将是不幸的。在接下来的两节中,我们将探讨一些传递丰富的域对象内容而不依赖Java序列化的替代方法。

对于所有其他content-type,SimpleMessageConverter直接将Message主体内容作为字节数组返回。

有关重要信息,请参阅Java反序列化

转换为Message

从任意Java对象转换为Message时,SimpleMessageConverter同样处理字节数组、字符串和可序列化实例。它将这些内容转换为字节(对于字节数组,无需转换),并相应地设置content-type属性。如果要转换的Object与这些类型都不匹配,则Message主体为null。

SerializerMessageConverter

此转换器类似于SimpleMessageConverter,不同之处在于它可以使用其他Spring Framework SerializerDeserializer实现进行application/x-java-serialized-object转换。

有关重要信息,请参阅Java反序列化

Jackson2JsonMessageConverter

本节介绍如何使用Jackson2JsonMessageConverter进行Message的转换。它包含以下部分:

转换为Message

如前一部分所述,通常不建议依赖Java序列化。一种更灵活且跨不同语言和平台更可移植的常用替代方法是JSON(JavaScript对象表示法)。可以在任何RabbitTemplate实例上配置转换器以覆盖其对SimpleMessageConverter默认值的用法。Jackson2JsonMessageConverter使用com.fasterxml.jackson 2.x库。以下示例配置了Jackson2JsonMessageConverter

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
        <bean class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter">
            <!-- if necessary, override the DefaultClassMapper -->
            <property name="classMapper" ref="customClassMapper"/>
        </bean>
    </property>
</bean>

如上所示,Jackson2JsonMessageConverter默认使用DefaultClassMapper。类型信息被添加到(并从)MessageProperties中检索。如果入站消息在MessageProperties中不包含类型信息,但您知道预期的类型,则可以使用defaultType属性配置静态类型,如下例所示:

<bean id="jsonConverterWithDefaultType"
      class="o.s.amqp.support.converter.Jackson2JsonMessageConverter">
    <property name="classMapper">
        <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
            <property name="defaultType" value="thing1.PurchaseOrder"/>
        </bean>
    </property>
</bean>

此外,您可以提供来自TypeId标头中值的自定义映射。以下示例显示了如何操作:

@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
    Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
    jsonConverter.setClassMapper(classMapper());
    return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
    DefaultClassMapper classMapper = new DefaultClassMapper();
    Map<String, Class<?>> idClassMapping = new HashMap<>();
    idClassMapping.put("thing1", Thing1.class);
    idClassMapping.put("thing2", Thing2.class);
    classMapper.setIdClassMapping(idClassMapping);
    return classMapper;
}

现在,如果发送系统将标头设置为thing1,则转换器将创建一个Thing1对象,依此类推。有关从非Spring应用程序转换消息的完整讨论,请参阅接收来自非Spring应用程序的JSON示例应用程序。

从2.4.3版开始,如果supportedMediaType具有charset参数,则转换器不会添加contentEncoding消息属性;这也被用于编码。添加了一个新的方法setSupportedMediaType

String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

Message转换

入站消息根据发送系统添加到标头的类型信息转换为对象。

从2.4.3版开始,如果没有contentEncoding消息属性,转换器将尝试在contentType消息属性中检测charset参数并使用它。如果两者都不存在,如果supportedMediaType具有charset参数,则将使用它进行解码,最终回退到defaultCharset属性。添加了一个新的方法setSupportedMediaType

String utf16 = "application/json; charset=utf-16";
converter.setSupportedContentType(MimeTypeUtils.parseMimeType(utf16));

在1.6之前的版本中,如果不存在类型信息,则转换将失败。从1.6版开始,如果缺少类型信息,转换器将使用Jackson默认值(通常是映射)转换JSON。

此外,从1.6版开始,当您使用@RabbitListener注解(在方法上)时,推断的类型信息将添加到MessageProperties中。这允许转换器转换为目标方法的参数类型。这仅适用于只有一个参数且没有注解或只有一个带有@Payload注解的参数的情况。在分析过程中,类型为Message的参数将被忽略。

默认情况下,推断的类型信息将覆盖发送系统创建的入站TypeId和相关标头。这允许接收系统自动转换为不同的域对象。这仅适用于参数类型是具体的(不是抽象的或接口)或者它来自java.util包的情况。在所有其他情况下,将使用TypeId和相关标头。在某些情况下,您可能希望覆盖默认行为并始终使用TypeId信息。例如,假设您有一个采用Thing1参数的@RabbitListener,但消息包含一个Thing2,它是Thing1(它是具体的)的子类。推断的类型将不正确。为了处理这种情况,请将Jackson2JsonMessageConverter上的TypePrecedence属性设置为TYPE_ID,而不是默认的INFERRED。(该属性实际上位于转换器的DefaultJackson2JavaTypeMapper上,但为了方便起见,在转换器上提供了一个setter。)如果您注入自定义类型映射器,则应在映射器上设置该属性。
Message转换时,传入的MessageProperties.getContentType()必须符合JSON(使用contentType.contains("json")进行检查)。从2.2版开始,如果没有contentType属性,或者它具有默认值application/octet-stream,则假定为application/json。要恢复到之前的行为(返回未转换的byte[]),请将转换器的assumeSupportedContentType属性设置为false。如果内容类型不受支持,则会发出WARN日志消息Could not convert incoming message with content-type [...],并按原样返回message.getBody() — 作为byte[]。因此,为了满足消费者端的Jackson2JsonMessageConverter要求,生产者必须添加contentType消息属性 — 例如,作为application/jsontext/x-json,或者使用Jackson2JsonMessageConverter,它会自动设置标头。以下列表显示了许多转换器调用:
@RabbitListener
public void thing1(Thing1 thing1) {...}

@RabbitListener
public void thing1(@Payload Thing1 thing1, @Header("amqp_consumerQueue") String queue) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.amqp.core.Message message) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<Foo> message) {...}

@RabbitListener
public void thing1(Thing1 thing1, String bar) {...}

@RabbitListener
public void thing1(Thing1 thing1, o.s.messaging.Message<?> message) {...}

在前面的列表中的前四种情况下,转换器尝试转换为Thing1类型。第五个示例无效,因为我们无法确定哪个参数应该接收消息有效负载。在第六个示例中,由于泛型类型是WildcardType,因此应用Jackson默认值。

但是,您可以创建一个自定义转换器并使用targetMethod消息属性来决定将JSON转换为哪种类型。

只有在方法级别声明@RabbitListener注解时,才能实现这种类型推断。使用类级别的@RabbitListener,转换后的类型用于选择要调用的哪个@RabbitHandler方法。为此,基础结构提供targetObject消息属性,您可以在自定义转换器中使用它来确定类型。
从1.6.11版开始,Jackson2JsonMessageConverter以及DefaultJackson2JavaTypeMapperDefaultClassMapper)提供trustedPackages选项来克服序列化漏洞。默认情况下,为了向后兼容性,Jackson2JsonMessageConverter信任所有包 — 即,它对该选项使用*

从2.4.7版开始,可以将转换器配置为在反序列化消息主体后Jackson返回null时返回Optional.empty()。这有助于@RabbitListener以两种方式接收空有效负载:

@RabbitListener(queues = "op.1")
void listen(@Payload(required = false) Thing payload) {
    handleOptional(payload); // payload might be null
}

@RabbitListener(queues = "op.2")
void listen(Optional<Thing> optional) {
    handleOptional(optional.orElse(this.emptyThing));
}

要启用此功能,请将setNullAsOptionalEmpty设置为true;当为false(默认值)时,转换器将回退到原始消息主体(byte[])。

@Bean
Jackson2JsonMessageConverter converter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setNullAsOptionalEmpty(true);
    return converter;
}

反序列化抽象类

在2.2.8版之前,如果@RabbitListener的推断类型是抽象类(包括接口),则转换器将回退到查找标头中的类型信息,如果存在,则使用该信息;如果不存在,它将尝试创建抽象类。当使用配置了自定义反序列化器来处理抽象类的自定义ObjectMapper时,但这会导致问题,但传入的消息具有无效的类型标头。

从2.2.8版开始,默认情况下保留之前的行为。如果您有这样的自定义ObjectMapper并且您希望忽略类型标头,并且始终使用推断类型进行转换,请将alwaysConvertToInferredType设置为true。这是为了向后兼容性,并避免在转换失败时(使用标准ObjectMapper)尝试转换的开销。

使用Spring Data投影接口

从2.2版开始,您可以将JSON转换为Spring Data投影接口而不是具体类型。这允许与数据的非常有选择性和低耦合的绑定,包括从JSON文档内的多个位置查找值。例如,可以将以下接口定义为消息有效负载类型:

interface SomeSample {

  @JsonPath({ "$.username", "$.user.name" })
  String getUsername();

}
@RabbitListener(queues = "projection")
public void projection(SomeSample in) {
    String username = in.getUsername();
    ...
}

访问器方法将默认用于在接收到的JSON文档中查找属性名称作为字段。@JsonPath表达式允许自定义值查找,甚至可以定义多个JSON路径表达式,以从多个位置查找值,直到表达式返回实际值。

要启用此功能,请在消息转换器上将useProjectionForInterfaces设置为true。您还必须将spring-data:spring-data-commonscom.jayway.jsonpath:json-path添加到类路径。

当用作@RabbitListener方法的参数时,接口类型将像往常一样自动传递给转换器。

使用RabbitTemplateMessage转换

如前所述,类型信息在消息标头中传递,以在从消息转换时辅助转换器。这在大多数情况下都能正常工作。但是,当使用泛型类型时,它只能转换简单的对象和已知的“容器”对象(列表、数组和映射)。从2.0版开始,Jackson2JsonMessageConverter实现了SmartMessageConverter,这允许它与采用ParameterizedTypeReference参数的新RabbitTemplate方法一起使用。这允许转换复杂的泛型类型,如下例所示:

Thing1<Thing2<Cat, Hat>> thing1 =
    rabbitTemplate.receiveAndConvert(new ParameterizedTypeReference<Thing1<Thing2<Cat, Hat>>>() { });
从2.1版开始,AbstractJsonMessageConverter类已被删除。它不再是Jackson2JsonMessageConverter的基类。它已被AbstractJackson2MessageConverter取代。

MarshallingMessageConverter

另一个选项是MarshallingMessageConverter。它委托给Spring OXM库中MarshallerUnmarshaller策略接口的实现。你可以在这里阅读更多关于该库的信息。在配置方面,最常见的是只提供构造函数参数,因为大多数Marshaller的实现也实现了Unmarshaller。以下示例显示如何配置MarshallingMessageConverter

<bean class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <property name="connectionFactory" ref="rabbitConnectionFactory"/>
    <property name="messageConverter">
        <bean class="org.springframework.amqp.support.converter.MarshallingMessageConverter">
            <constructor-arg ref="someImplemenationOfMarshallerAndUnmarshaller"/>
        </bean>
    </property>
</bean>

Jackson2XmlMessageConverter

此类在2.1版本中引入,可用于将消息转换为XML格式,反之亦然。

Jackson2XmlMessageConverterJackson2JsonMessageConverter都具有相同的基类:AbstractJackson2MessageConverter

引入AbstractJackson2MessageConverter类是为了替换已移除的类:AbstractJsonMessageConverter

Jackson2XmlMessageConverter使用com.fasterxml.jackson 2.x库。

你可以像使用Jackson2JsonMessageConverter一样使用它,只是它支持XML而不是JSON。以下示例配置了Jackson2JsonMessageConverter

<bean id="xmlConverterWithDefaultType"
        class="org.springframework.amqp.support.converter.Jackson2XmlMessageConverter">
    <property name="classMapper">
        <bean class="org.springframework.amqp.support.converter.DefaultClassMapper">
            <property name="defaultType" value="foo.PurchaseOrder"/>
        </bean>
    </property>
</bean>

参见Jackson2JsonMessageConverter了解更多信息。

从2.2版本开始,如果不存在contentType属性,或者其默认值为application/octet-stream,则假定为application/xml。要恢复到之前的行为(返回未转换的byte[]),请将转换器的assumeSupportedContentType属性设置为false

ContentTypeDelegatingMessageConverter

此类在1.4.2版本中引入,允许根据MessageProperties中的内容类型属性委托给特定的MessageConverter。默认情况下,如果不存在contentType属性,或者其值与任何已配置的转换器都不匹配,则它会委托给SimpleMessageConverter。以下示例配置了ContentTypeDelegatingMessageConverter

<bean id="contentTypeConverter" class="ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json" value-ref="jsonMessageConverter" />
            <entry key="application/xml" value-ref="xmlMessageConverter" />
        </map>
    </property>
</bean>

Java反序列化

本节介绍如何反序列化Java对象。

从不受信任的来源反序列化Java对象时,存在潜在的安全漏洞。

如果你接受来自不受信任来源的消息,其content-typeapplication/x-java-serialized-object,则应考虑配置允许反序列化的包和类。这适用于SimpleMessageConverterSerializerMessageConverter,当它们被配置为隐式或通过配置使用DefaultDeserializer时。

默认情况下,允许列表为空,这意味着不会反序列化任何类。

你可以设置一系列模式,例如thing1.**thing1.thing2.Catcom.example.**.MySafeClass

这些模式将按顺序检查,直到找到匹配项。如果没有匹配项,则会抛出SecurityException

你可以使用这些转换器的allowedListPatterns属性设置模式。或者,如果你信任所有消息发送者,可以将环境变量SPRING_AMQP_DESERIALIZATION_TRUST_ALL或系统属性spring.amqp.deserialization.trust.all设置为true

消息属性转换器

MessagePropertiesConverter策略接口用于在Rabbit客户端BasicProperties和Spring AMQP MessageProperties之间进行转换。默认实现(DefaultMessagePropertiesConverter)通常足以满足大多数目的,但如果需要,你可以实现自己的转换器。默认属性转换器将类型为LongStringBasicProperties元素转换为String实例(当大小不超过1024字节时)。更大的LongString实例不会被转换(参见下一段)。此限制可以通过构造函数参数来覆盖。

从1.6版本开始,默认情况下,DefaultMessagePropertiesConverter现在将长度超过长字符串限制(默认值:1024)的标头保留为LongString实例。你可以通过getBytes[]toString()getStream()方法访问其内容。

以前,DefaultMessagePropertiesConverter将此类标头“转换为”DataInputStream(实际上它只是引用了LongString实例的DataInputStream)。在输出时,此标头不会被转换(除了转换为字符串——例如,通过对流调用toString()得到java.io.DataInputStream@1d057a39)。

现在,大型传入LongString标头在输出时也会被正确“转换”(默认情况下)。

提供了一个新的构造函数,允许你配置转换器以与以前一样工作。以下列表显示了Javadoc注释和方法声明

/**
 * Construct an instance where LongStrings will be returned
 * unconverted or as a java.io.DataInputStream when longer than this limit.
 * Use this constructor with 'true' to restore pre-1.6 behavior.
 * @param longStringLimit the limit.
 * @param convertLongLongStrings LongString when false,
 * DataInputStream when true.
 * @since 1.6
 */
public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLongLongStrings) { ... }

从1.6版本开始,MessageProperties中添加了一个名为correlationIdString的新属性。以前,在转换RabbitMQ客户端使用的BasicProperties时,会执行不必要的byte[] <→ String转换,因为MessageProperties.correlationIdbyte[],但BasicProperties使用String。(最终,RabbitMQ客户端使用UTF-8将String转换为字节,以将其放入协议消息中)。

为了提供最大的向后兼容性,已向DefaultMessagePropertiesConverter添加了一个名为correlationIdPolicy的新属性。这需要一个DefaultMessagePropertiesConverter.CorrelationIdPolicy枚举参数。默认情况下,它设置为BYTES,这复制了之前的行为。

对于入站消息

  • STRING:仅映射correlationIdString属性

  • BYTES:仅映射correlationId属性

  • BOTH:两个属性都映射

对于出站消息

  • STRING:仅映射correlationIdString属性

  • BYTES:仅映射correlationId属性

  • BOTH:考虑两个属性,String属性优先

从1.6版本开始,入站deliveryMode属性不再映射到MessageProperties.deliveryMode。它映射到MessageProperties.receivedDeliveryMode。此外,入站userId属性不再映射到MessageProperties.userId。它映射到MessageProperties.receivedUserId。这些更改是为了避免在同一MessageProperties对象用于出站消息时意外传播这些属性。

从2.2版本开始,DefaultMessagePropertiesConverter使用getName()而不是toString()转换任何值为Class<?>类型的自定义标头;这避免了使用者应用程序不得不从toString()表示中解析类名。对于滚动升级,你可能需要更改你的使用者以理解这两种格式,直到所有生产者都升级为止。