AmqpTemplate
与 Spring 框架和相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个在其中发挥核心作用的“模板”。定义主要操作的接口称为AmqpTemplate
。这些操作涵盖了发送和接收消息的一般行为。换句话说,它们不是任何实现所独有的——因此名称中的“AMQP”。另一方面,该接口的实现与 AMQP 协议的实现相关联。与 JMS 不同,JMS 本身是一个接口级 API,AMQP 是一个线级协议。该协议的实现提供了自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。目前,只有一个实现:RabbitTemplate
。在接下来的示例中,我们经常使用AmqpTemplate
。但是,当您查看配置示例或任何模板实例化或设置器调用的代码片段时,您可以看到实现类型(例如,RabbitTemplate
)。
另请参阅异步 Rabbit 模板。
添加重试功能
从版本 1.3 开始,您现在可以配置RabbitTemplate
以使用RetryTemplate
来帮助处理代理连接问题。有关完整信息,请参阅spring-retry 项目。以下只是一个示例,它使用指数退避策略和默认的SimpleRetryPolicy
,该策略在向调用者抛出异常之前尝试三次。
以下示例使用 XML 命名空间
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
以下示例在 Java 中使用@Configuration
注解
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
从版本 1.4 开始,除了retryTemplate
属性之外,RabbitTemplate
还支持recoveryCallback
选项。它用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback)
的第二个参数。
RecoveryCallback 有点受限,因为重试上下文只包含lastThrowable 字段。对于更复杂的用例,您应该使用外部RetryTemplate ,以便您可以通过上下文的属性将其他信息传递给RecoveryCallback 。以下示例演示了如何执行此操作
|
retryTemplate.execute(
new RetryCallback<Object, Exception>() {
@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}, new RecoveryCallback<Object>() {
@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
在这种情况下,您**不**应该将RetryTemplate
注入RabbitTemplate
。
发布是异步的——如何检测成功和失败
发布消息是一种异步机制,默认情况下,无法路由的消息会被 RabbitMQ 丢弃。为了成功发布,您可以接收异步确认,如相关发布者确认和返回中所述。考虑两种失败场景
-
发布到交换机,但没有匹配的目标队列。
-
发布到不存在的交换机。
第一种情况由发布者返回涵盖,如相关发布者确认和返回中所述。
对于第二种情况,消息会被丢弃,不会生成任何返回。底层通道会关闭并抛出异常。默认情况下,此异常会被记录,但您可以向CachingConnectionFactory
注册一个ChannelListener
来获取此类事件的通知。以下示例展示了如何添加一个ConnectionListener
this.connectionFactory.addConnectionListener(new ConnectionListener() {
@Override
public void onCreate(Connection connection) {
}
@Override
public void onShutDown(ShutdownSignalException signal) {
...
}
});
您可以检查信号的reason
属性以确定发生的问题。
为了在发送线程上检测异常,您可以在RabbitTemplate
上setChannelTransacted(true)
,异常将在txCommit()
上被检测到。但是,**事务会严重影响性能**,因此在仅针对此用例启用事务之前,请仔细考虑。
相关发布者确认和返回
RabbitTemplate
对AmqpTemplate
的实现支持发布者确认和返回。
对于返回的消息,模板的mandatory
属性必须设置为true
,或者对于特定消息,mandatory-expression
必须评估为true
。此功能需要一个CachingConnectionFactory
,其publisherReturns
属性设置为true
(参见发布者确认和返回)。返回消息通过注册一个RabbitTemplate.ReturnsCallback
发送到客户端,方法是调用setReturnsCallback(ReturnsCallback callback)
。回调必须实现以下方法
void returnedMessage(ReturnedMessage returned);
ReturnedMessage
具有以下属性
-
message
- 返回的消息本身 -
replyCode
- 指示返回原因的代码 -
replyText
- 返回的文本原因 - 例如NO_ROUTE
-
exchange
- 发送消息的交换机 -
routingKey
- 使用的路由键
每个RabbitTemplate
只支持一个ReturnsCallback
。另请参见回复超时。
对于发布者确认(也称为发布者确认),模板需要一个CachingConnectionFactory
,其publisherConfirm
属性设置为ConfirmType.CORRELATED
。确认通过注册一个RabbitTemplate.ConfirmCallback
发送到客户端,方法是调用setConfirmCallback(ConfirmCallback callback)
。回调必须实现此方法
void confirm(CorrelationData correlationData, boolean ack, String cause);
当客户端发送原始消息时,会提供一个名为 CorrelationData
的对象。ack
为真表示确认 (ack
),为假表示拒绝 (nack
)。对于 nack
实例,如果在生成 nack
时可用,cause
可能包含 nack
的原因。例如,当向不存在的交换机发送消息时,代理会关闭通道。关闭的原因包含在 cause
中。cause
在 1.4 版本中添加。
RabbitTemplate
仅支持一个 ConfirmCallback
。
当 Rabbit 模板发送操作完成时,通道会关闭。这会阻止在连接工厂缓存已满时接收确认或返回(当缓存中有空间时,通道不会物理关闭,返回和确认会正常进行)。当缓存已满时,框架会将关闭延迟最多五秒钟,以便为接收确认和返回留出时间。当使用确认时,通道会在收到最后一个确认时关闭。当仅使用返回时,通道会保持打开状态五秒钟。我们通常建议将连接工厂的 channelCacheSize 设置为足够大的值,以便将发布消息的通道返回到缓存中,而不是关闭它。您可以使用 RabbitMQ 管理插件监控通道使用情况。如果您看到通道快速打开和关闭,您应该考虑增加缓存大小以减少服务器的开销。
|
在 2.1 版本之前,启用发布者确认的通道会在收到确认之前返回到缓存中。其他进程可能会检出通道并执行导致通道关闭的操作,例如将消息发布到不存在的交换机。这会导致确认丢失。2.1 及更高版本不再在确认未完成时将通道返回到缓存中。RabbitTemplate 在每次操作后对通道执行逻辑 close() 。通常,这意味着一次只有一个确认在通道上处于未完成状态。
|
从 2.2 版本开始,回调将在连接工厂的 executor 线程之一上调用。这样做是为了避免在回调中执行 Rabbit 操作时可能发生的死锁。在之前的版本中,回调是在 amqp-client 连接 I/O 线程上直接调用的;如果执行一些 RPC 操作(例如打开新通道),这会导致死锁,因为 I/O 线程会阻塞等待结果,但结果需要由 I/O 线程本身处理。在这些版本中,有必要在回调中将工作(例如发送消息)传递给另一个线程。由于框架现在将回调调用传递给执行器,因此不再需要这样做。
|
只要返回回调在 60 秒或更短时间内执行,就可以保证在确认之前收到返回的消息。确认将在返回回调退出或 60 秒后(以先到者为准)进行调度。 |
CorrelationData
对象有一个 CompletableFuture
,您可以使用它来获取结果,而不是在模板上使用 ConfirmCallback
。以下示例展示了如何配置 CorrelationData
实例
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由于它是一个 CompletableFuture<Confirm>
,您可以选择在准备就绪时 get()
结果,或者使用 whenComplete()
进行异步回调。Confirm
对象是一个简单的 bean,具有 2 个属性:ack
和 reason
(对于 nack
实例)。对于由代理生成的 nack
实例,不会填充原因。它会填充由框架生成的 nack
实例(例如,在 ack
实例未完成时关闭连接)。
此外,当同时启用确认和返回时,如果 CorrelationData
的 return
属性无法路由到任何队列,则会填充返回的消息。保证在使用 ack
设置未来之前设置返回的消息属性。CorrelationData.getReturn()
返回一个 ReturnMessage
,它具有以下属性
-
message(返回的消息)
-
replyCode
-
replyText
-
exchange
-
routingKey
另请参阅 范围操作,了解用于等待发布者确认的更简单机制。
范围操作
通常,在使用模板时,会从缓存中检出(或创建)一个Channel
,用于操作,然后将其返回到缓存中以供重用。在多线程环境中,无法保证下一个操作使用相同的通道。但是,有时您可能希望对通道的使用有更多控制,并确保多个操作都在同一个通道上执行。
从 2.0 版本开始,提供了一种名为invoke
的新方法,它带有OperationsCallback
。在回调范围内和提供的RabbitOperations
参数上执行的任何操作都使用相同的专用Channel
,该通道将在结束时关闭(不会返回到缓存)。如果通道是PublisherCallbackChannel
,则在收到所有确认后将其返回到缓存(参见相关发布者确认和返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
T doInRabbit(RabbitOperations operations);
}
您可能需要此功能的一个示例是,如果您希望在底层Channel
上使用waitForConfirms()
方法。此方法以前没有通过 Spring API 公开,因为该通道通常是缓存和共享的,如前所述。RabbitTemplate
现在提供waitForConfirms(long timeout)
和waitForConfirmsOrDie(long timeout)
,它们委托给OperationsCallback
范围内使用的专用通道。出于显而易见的原因,这些方法不能在该范围之外使用。
请注意,在其他地方提供了允许您将确认与请求关联的更高级别的抽象(参见相关发布者确认和返回)。如果您只想等到代理确认了传递,可以使用以下示例中所示的技术
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
如果您希望RabbitAdmin
操作在OperationsCallback
范围内在同一个通道上调用,则该管理员必须使用与invoke
操作相同的RabbitTemplate
构建。
如果模板操作已经在现有事务范围内执行,则上述讨论是无稽之谈,例如,当在事务监听器容器线程上运行并在事务模板上执行操作时。在这种情况下,操作将在该通道上执行,并在线程返回到容器时提交。在这种情况下,无需使用invoke 。
|
当以这种方式使用确认时,用于将确认与请求关联的大部分基础设施实际上并不需要(除非也启用了返回)。从 2.2 版本开始,连接工厂支持一个名为publisherConfirmType
的新属性。当将其设置为ConfirmType.SIMPLE
时,将避免使用基础设施,并且确认处理可以更高效。
此外,RabbitTemplate
在发送的消息 MessageProperties
中设置了 publisherSequenceNumber
属性。如果您希望检查(或记录或以其他方式使用)特定确认,您可以使用重载的 invoke
方法,如下例所示
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
这些 ConfirmCallback 对象(用于 ack 和 nack 实例)是 Rabbit 客户端回调,而不是模板回调。
|
以下示例记录了 ack
和 nack
实例
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
作用域操作绑定到线程。有关多线程环境中严格排序的讨论,请参见 多线程环境中的严格消息排序。 |
多线程环境中的严格消息排序
在 作用域操作 中的讨论仅适用于在同一线程上执行操作时。
考虑以下情况
-
thread-1
将消息发送到队列,并将工作移交给thread-2
-
thread-2
将消息发送到同一个队列
由于 RabbitMQ 的异步性质以及缓存通道的使用,无法确定将使用相同的通道,因此消息在队列中到达的顺序无法保证。(在大多数情况下,它们会按顺序到达,但乱序交付的概率不为零)。为了解决这种情况,您可以使用大小为 1
的有界通道缓存(以及 channelCheckoutTimeout
)来确保消息始终发布在同一个通道上,并且顺序将得到保证。为此,如果您对连接工厂有其他用途,例如消费者,您应该为模板使用专用连接工厂,或者配置模板以使用主连接工厂中嵌入的发布者连接工厂(请参见 使用单独的连接)。
这最好用一个简单的 Spring Boot 应用程序来说明
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}
void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}
}
即使发布是在两个不同的线程上执行的,它们都将使用相同的通道,因为缓存限制为单个通道。
从 2.3.7 版本开始,ThreadChannelConnectionFactory
支持使用 prepareContextSwitch
和 switchContext
方法将线程的通道传输到另一个线程。第一个方法返回一个上下文,该上下文传递给调用第二个方法的第二个线程。一个线程可以绑定非事务性通道或事务性通道(或两者之一);您不能单独传输它们,除非您使用两个连接工厂。以下是一个示例
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}
@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}
@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}
@Bean
Queue queue() {
return new Queue("queue");
}
@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}
}
@Component
class Service {
private static final Logger LOG = LoggerFactory.getLogger(Service.class);
private final RabbitTemplate template;
private final TaskExecutor exec;
private final ThreadChannelConnectionFactory connFactory;
Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {
this.template = template;
this.exec = exec;
this.connFactory = tccf;
}
void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}
void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}
}
一旦调用了 prepareSwitchContext ,如果当前线程执行了更多操作,它们将在新的通道上执行。当不再需要线程绑定的通道时,关闭它很重要。
|
消息集成
从 1.4 版本开始,RabbitMessagingTemplate
(基于 RabbitTemplate
构建)提供了与 Spring 框架消息抽象的集成,即 org.springframework.messaging.Message
。这使您可以使用 spring-messaging
Message<?>
抽象来发送和接收消息。这种抽象被其他 Spring 项目使用,例如 Spring Integration 和 Spring 的 STOMP 支持。涉及两个消息转换器:一个用于在 spring-messaging Message<?>
和 Spring AMQP 的 Message
抽象之间进行转换,另一个用于在 Spring AMQP 的 Message
抽象和底层 RabbitMQ 客户端库所需的格式之间进行转换。默认情况下,消息有效负载由提供的 RabbitTemplate
实例的消息转换器进行转换。或者,您可以注入一个自定义的 MessagingMessageConverter
以及其他一些有效负载转换器,如下例所示
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
验证的用户 ID
从 1.6 版本开始,模板现在支持 user-id-expression
(在使用 Java 配置时为 userIdExpression
)。如果发送消息,则在评估此表达式后设置用户 ID 属性(如果尚未设置)。评估的根对象是待发送的消息。
以下示例展示了如何使用 user-id-expression
属性
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个示例是一个文字表达式。第二个示例从应用程序上下文中的连接工厂 bean 获取 username
属性。
使用单独的连接
从 2.0.2 版本开始,您可以将 usePublisherConnection
属性设置为 true
,以便在可能的情况下使用与监听器容器使用的连接不同的连接。这样做是为了避免在生产者因任何原因被阻塞时消费者也被阻塞。连接工厂为此目的维护一个第二个内部连接工厂;默认情况下,它与主工厂类型相同,但如果您希望使用不同的工厂类型进行发布,则可以显式设置。如果 Rabbit 模板在由监听器容器启动的事务中运行,则无论此设置如何,都将使用容器的通道。
一般来说,你不应该在将此属性设置为true 的模板中使用RabbitAdmin 。请使用带有连接工厂的RabbitAdmin 构造函数。如果你使用带有模板的另一个构造函数,请确保模板的属性为false 。这是因为,通常,管理员用于为监听器容器声明队列。使用将属性设置为true 的模板将意味着独占队列(例如AnonymousQueue )将在与监听器容器使用的连接不同的连接上声明。在这种情况下,队列无法被容器使用。
|