发送消息

发送消息时,可以使用以下任何方法

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

我们可以从上一列表中的最后一种方法开始讨论,因为它实际上是最明确的。它允许在运行时提供 AMQP 交换名称(以及路由键)。最后一个参数是负责实际创建消息实例的回调。使用此方法发送消息的示例可能如下所示:以下示例显示了如何使用 `send` 方法发送消息

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

如果计划使用该模板实例向同一个交换机发送大部分或全部时间的消息,则可以在模板本身上设置 `exchange` 属性。在这种情况下,可以使用上一个列表中的第二种方法。以下示例在功能上等同于前面的示例

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

如果在模板上同时设置了 `exchange` 和 `routingKey` 属性,则可以使用仅接受 `Message` 的方法。以下示例显示了如何操作

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

对交换机和路由键属性更好的理解是,显式的方法参数始终会覆盖模板的默认值。事实上,即使您没有在模板上显式设置这些属性,也始终存在默认值。在这两种情况下,默认值都是空字符串,但这实际上是一个合理的默认值。就路由键而言,它首先并不总是必要的(例如,对于 `Fanout` 交换机)。此外,队列可以使用空字符串绑定到交换机。这两种情况都是依赖于模板的路由键属性的默认空字符串值的合法场景。就交换机名称而言,空字符串通常使用,因为 AMQP 规范将“默认交换机”定义为没有名称。由于所有队列都自动绑定到该默认交换机(这是一个直接交换机),使用它们的名称作为绑定值,因此上一个列表中的第二种方法可用于通过默认交换机向任何队列进行简单的点对点消息传递。您可以通过在运行时提供方法参数来提供队列名称作为 `routingKey`。以下示例显示了如何操作

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

或者,您可以创建一个模板,主要或专门用于向单个队列发布消息。以下示例显示了如何操作

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

消息构建器 API

从 1.3 版本开始,MessageBuilderMessagePropertiesBuilder 提供了消息构建器 API。这些方法提供了一种方便的“流畅”方式来创建消息或消息属性。以下示例显示了流畅的 API 在实际应用中的情况

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

可以设置 MessageProperties 上定义的每个属性。其他方法包括 `setHeader(String key, String value)`、`removeHeader(String key)`、`removeHeaders()` 和 `copyProperties(MessageProperties properties)`。每个属性设置方法都有一个 `set*IfAbsent()` 变体。在存在默认初始值的情况下,方法名为 `set*IfAbsentOrDefault()`。

提供了五个静态方法来创建初始消息构建器

public static MessageBuilder withBody(byte[] body) (1)

public static MessageBuilder withClonedBody(byte[] body) (2)

public static MessageBuilder withBody(byte[] body, int from, int to) (3)

public static MessageBuilder fromMessage(Message message) (4)

public static MessageBuilder fromClonedMessage(Message message) (5)
1 构建器创建的消息的主体是对参数的直接引用。
2 构建器创建的消息主体是一个新数组,其中包含参数中字节的副本。
3 构建器创建的消息主体是一个新数组,其中包含参数中指定范围内的字节。更多详情,请参见 Arrays.copyOfRange()
4 构建器创建的消息主体是对参数主体的直接引用。参数的属性被复制到一个新的MessageProperties对象。
5 构建器创建的消息主体是一个新数组,其中包含参数主体副本。参数的属性被复制到一个新的MessageProperties对象。

提供了三个静态方法来创建MessagePropertiesBuilder实例。

public static MessagePropertiesBuilder newInstance() (1)

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 一个新的消息属性对象使用默认值进行初始化。
2 构建器使用提供的属性对象进行初始化,并且build()将返回该对象。
3 参数的属性被复制到一个新的MessageProperties对象。

对于AmqpTemplateRabbitTemplate实现,每个send()方法都有一个重载版本,它接受一个额外的CorrelationData对象。启用发布者确认时,此对象将在AmqpTemplate中描述的回调中返回。这允许发送方将确认(acknack)与已发送的消息关联起来。

从1.6.7版本开始,引入了CorrelationAwareMessagePostProcessor接口,允许在消息转换后修改关联数据。以下示例显示了如何使用它。

Message postProcessMessage(Message message, Correlation correlation);

在2.0版本中,此接口已弃用。该方法已移至MessagePostProcessor,并具有一个默认实现,该实现委托给postProcessMessage(Message message)

同样从1.6.7版本开始,提供了一个名为CorrelationDataPostProcessor的新回调接口。在所有MessagePostProcessor实例(在send()方法中提供以及在setBeforePublishPostProcessors()中提供的实例)之后调用此接口。实现可以更新或替换send()方法中提供的关联数据(如有)。Message和原始CorrelationData(如有)作为参数提供。以下示例显示了如何使用postProcess方法。

CorrelationData postProcess(Message message, CorrelationData correlationData);

发布者返回

当模板的mandatory属性为true时,返回的消息由AmqpTemplate中描述的回调提供。

从1.4版本开始,RabbitTemplate支持SpEL mandatoryExpression属性,该属性针对每个请求消息作为根评估对象进行评估,解析为boolean值。bean引用(例如@myBean.isMandatory(#root))可用于表达式。

发布者返回也可以在RabbitTemplate的发送和接收操作中内部使用。有关更多信息,请参见回复超时

批量处理

1.4.2版本引入了BatchingRabbitTemplate。这是RabbitTemplate的一个子类,它具有重载的send方法,该方法根据BatchingStrategy对消息进行批量处理。只有在批处理完成时,消息才会发送到RabbitMQ。以下清单显示了BatchingStrategy接口定义。

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}
批量数据保存在内存中。如果系统发生故障,未发送的消息可能会丢失。

提供了一个SimpleBatchingStrategy。它支持将消息发送到单个交换机或路由键。它具有以下属性:

  • batchSize:发送批处理之前批处理中的消息数。

  • bufferLimit:批量消息的最大大小。如果超过此值,则会抢占batchSize,并导致发送部分批处理。

  • timeout:在没有添加消息到批处理的新活动之后,发送部分批处理的时间。

SimpleBatchingStrategy通过在每个嵌入式消息前面添加四个字节的二进制长度来格式化批处理。通过将springBatchFormat消息属性设置为lengthHeader4来将其传达给接收系统。

默认情况下,监听器容器会自动取消批量消息的批量处理(通过使用springBatchFormat消息头)。拒绝批处理中的任何消息都会导致拒绝整个批处理。

但是,有关更多信息,请参见带有批量处理的@RabbitListener