发送消息
发送消息时,可以使用以下任何方法
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
我们可以从前面列表中的最后一种方法开始讨论,因为它实际上是最明确的。它允许在运行时提供 AMQP 交换名称(以及路由键)。最后一个参数是负责实际创建消息实例的回调。使用此方法发送消息的示例可能如下所示:以下示例展示了如何使用 send
方法发送消息
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
如果计划使用该模板实例将消息发送到同一个交换机(大部分或全部时间),则可以在模板本身设置 exchange
属性。在这种情况下,可以使用前面列表中的第二种方法。以下示例在功能上等同于前面的示例
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果模板中同时设置了exchange
和routingKey
属性,则可以使用仅接受Message
的方法。以下示例展示了如何操作
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
更好地理解exchange和routing key属性的方法是,显式方法参数始终会覆盖模板的默认值。事实上,即使您没有在模板上显式设置这些属性,也始终存在默认值。在这两种情况下,默认值都是一个空String
,但这实际上是一个合理的默认值。就routing key而言,它并不总是必要的(例如,对于Fanout
交换机)。此外,队列可以绑定到具有空String
的交换机。这两种情况都是依赖于模板的routing key属性的默认空String
值的合法场景。就交换机名称而言,空String
通常被使用,因为AMQP规范将“默认交换机”定义为没有名称。由于所有队列都自动绑定到该默认交换机(这是一个直接交换机),使用它们的名称作为绑定值,前面的列表中的第二个方法可以用于通过默认交换机向任何队列进行简单的点对点消息传递。您可以通过在运行时提供方法参数来提供队列名称作为routingKey
。以下示例展示了如何操作
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
或者,您可以创建一个主要或专门用于发布到单个队列的模板。以下示例展示了如何操作
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
消息构建器 API
从 1.3 版本开始,MessageBuilder
和 MessagePropertiesBuilder
提供了消息构建器 API。这些方法提供了一种方便的“流畅”方式来创建消息或消息属性。以下示例展示了流畅 API 的实际应用
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
可以设置在 MessageProperties
上定义的每个属性。其他方法包括setHeader(String key, String value)
、removeHeader(String key)
、removeHeaders()
和copyProperties(MessageProperties properties)
。每个属性设置方法都有一个set*IfAbsent()
变体。在存在默认初始值的情况下,方法名为set*IfAbsentOrDefault()
。
提供了五个静态方法来创建初始消息构建器
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
1 | 由构建器创建的消息的正文是对参数的直接引用。 |
2 | 构建器创建的消息主体是一个新数组,包含参数中字节的副本。 |
3 | 构建器创建的消息主体是一个新数组,包含参数中字节范围的副本。有关更多详细信息,请参阅 Arrays.copyOfRange() 。 |
4 | 构建器创建的消息主体是对参数主体的直接引用。参数的属性被复制到一个新的 MessageProperties 对象中。 |
5 | 构建器创建的消息主体是一个新数组,包含参数主体副本。参数的属性被复制到一个新的 MessageProperties 对象中。 |
提供了三个静态方法来创建 MessagePropertiesBuilder
实例。
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 | 一个新的消息属性对象被初始化为默认值。 |
2 | 构建器被初始化为提供的属性对象,并且 build() 将返回该对象。 |
3 | 参数的属性被复制到一个新的 MessageProperties 对象中。 |
使用 RabbitTemplate
实现的 AmqpTemplate
,每个 send()
方法都有一个重载版本,它接受一个额外的 CorrelationData
对象。当启用发布者确认时,该对象将在 AmqpTemplate
中描述的回调中返回。这使发送者能够将确认(ack
或 nack
)与发送的消息相关联。
从版本 1.6.7 开始,引入了 CorrelationAwareMessagePostProcessor
接口,允许在消息转换后修改相关数据。以下示例展示了如何使用它。
Message postProcessMessage(Message message, Correlation correlation);
在版本 2.0 中,此接口已弃用。该方法已移至 MessagePostProcessor
,并具有一个默认实现,该实现委托给 postProcessMessage(Message message)
。
同样从版本 1.6.7 开始,提供了一个名为 CorrelationDataPostProcessor
的新回调接口。它在所有 MessagePostProcessor
实例(在 send()
方法中提供以及在 setBeforePublishPostProcessors()
中提供)之后被调用。实现可以更新或替换在 send()
方法中提供的相关数据(如果有)。Message
和原始 CorrelationData
(如果有)作为参数提供。以下示例展示了如何使用 postProcess
方法。
CorrelationData postProcess(Message message, CorrelationData correlationData);
发布者返回
当模板的 mandatory
属性为 true
时,返回的消息由 AmqpTemplate
中描述的回调提供。
从 1.4 版本开始,RabbitTemplate
支持 SpEL mandatoryExpression
属性,该属性针对每个请求消息作为根评估对象进行评估,解析为 boolean
值。表达式中可以使用 Bean 引用,例如 @myBean.isMandatory(#root)
。
发布者返回值也可以在 RabbitTemplate
的发送和接收操作中内部使用。有关更多信息,请参见 回复超时。
批处理
1.4.2 版本引入了 BatchingRabbitTemplate
。这是 RabbitTemplate
的子类,它具有重写的 send
方法,该方法根据 BatchingStrategy
对消息进行批处理。只有在批处理完成后,消息才会发送到 RabbitMQ。以下清单显示了 BatchingStrategy
接口定义
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
批处理数据保存在内存中。在系统故障的情况下,未发送的消息可能会丢失。 |
提供了一个 SimpleBatchingStrategy
。它支持将消息发送到单个交换机或路由键。它具有以下属性
-
batchSize
:批处理中消息的数量,在发送之前。 -
bufferLimit
:批处理消息的最大大小。如果超过此限制,它会优先于batchSize
,并导致部分批处理被发送。 -
timeout
:当没有新的活动将消息添加到批处理时,部分批处理将发送的时间。
SimpleBatchingStrategy
通过在每个嵌入式消息之前添加一个四字节的二进制长度来格式化批处理。这通过将 springBatchFormat
消息属性设置为 lengthHeader4
来传达给接收系统。
默认情况下,监听器容器会自动对批处理消息进行解批处理(通过使用 springBatchFormat 消息头)。拒绝批处理中的任何消息会导致整个批处理被拒绝。
|
但是,有关更多信息,请参见 @RabbitListener with Batching。