AmqpTemplate

与 Spring Framework 和相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个“模板”,它起着核心作用。定义主要操作的接口称为 AmqpTemplate。这些操作涵盖了发送和接收消息的通用行为。换句话说,它们不是任何特定实现所独有的——因此名称中的“AMQP”。另一方面,该接口的实现与 AMQP 协议的实现相关联。与 JMS 本身是接口级 API 不同,AMQP 是线级协议。该协议的实现提供自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。目前,只有一种实现:RabbitTemplate。在以下示例中,我们经常使用 AmqpTemplate。但是,当您查看配置示例或任何包含模板实例化或设置器调用的代码片段时,您可以看到实现类型(例如,RabbitTemplate)。

如前所述,AmqpTemplate 接口定义了发送和接收消息的所有基本操作。我们将在 发送消息接收消息 中分别探讨消息发送和接收。

另请参见 异步 Rabbit 模板

添加重试功能

从 1.3 版本开始,您现在可以配置 RabbitTemplate 以使用 RetryTemplate 来帮助处理代理连接问题。有关完整信息,请参阅 spring-retry 项目。以下只是一个使用指数退避策略和默认 SimpleRetryPolicy 的示例,该策略在抛出异常给调用方之前尝试三次。

以下示例使用 XML 命名空间

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

以下示例在 Java 中使用 @Configuration 注解

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

从 1.4 版本开始,除了 retryTemplate 属性外,RabbitTemplate 上还支持 recoveryCallback 选项。它用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) 的第二个参数。

RecoveryCallback 有点局限,因为重试上下文仅包含 lastThrowable 字段。对于更复杂的用例,您应该使用外部 RetryTemplate,以便您可以通过上下文的属性将其他信息传达给 RecoveryCallback。以下示例演示了如何做到这一点
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

在这种情况下,您将**不会**将 RetryTemplate 注入 RabbitTemplate

发布是异步的——如何检测成功和失败

发布消息是一种异步机制,默认情况下,RabbitMQ 会丢弃无法路由的消息。对于成功的发布,您可以接收异步确认,如 相关发布者确认和返回 中所述。考虑两种故障场景

  • 发布到交换机,但没有匹配的目标队列。

  • 发布到不存在的交换机。

第一种情况由发布者返回涵盖,如 相关发布者确认和返回 中所述。

对于第二种情况,消息会被丢弃,并且不会生成任何返回。底层通道将关闭并出现异常。默认情况下,此异常会被记录,但您可以向 CachingConnectionFactory 注册 ChannelListener 以获取此类事件的通知。以下示例演示了如何添加 ConnectionListener

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

您可以检查信号的reason属性以确定发生的问题。

要在发送线程上检测异常,您可以在RabbitTemplate上设置setChannelTransacted(true),并在txCommit()上检测异常。但是,**事务会严重影响性能**,因此在仅为此用例启用事务之前,请仔细考虑。

相关发布者确认和返回

RabbitTemplate实现的AmqpTemplate支持发布者确认和返回。

对于返回的消息,模板的mandatory属性必须设置为true,或者对于特定消息,mandatory-expression必须计算结果为true。此功能需要一个CachingConnectionFactory,其publisherReturns属性设置为true(请参阅发布者确认和返回)。客户端通过调用setReturnsCallback(ReturnsCallback callback)注册RabbitTemplate.ReturnsCallback来接收返回的消息。回调必须实现以下方法

void returnedMessage(ReturnedMessage returned);

ReturnedMessage具有以下属性

  • message - 返回的消息本身

  • replyCode - 指示返回原因的代码

  • replyText - 返回的文本原因 - 例如NO_ROUTE

  • exchange - 发送消息的交换机

  • routingKey - 使用的路由键

每个RabbitTemplate仅支持一个ReturnsCallback。另请参阅回复超时

对于发布者确认(也称为发布者确认),模板需要一个CachingConnectionFactory,其publisherConfirm属性设置为ConfirmType.CORRELATED。客户端通过调用setConfirmCallback(ConfirmCallback callback)注册RabbitTemplate.ConfirmCallback来接收确认。回调必须实现此方法

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationData是客户端在发送原始消息时提供的对象。ack对于ack为真,对于nack为假。对于nack实例,如果在生成nack时可用,则原因可能包含nack的原因。一个示例是将消息发送到不存在的交换机。在这种情况下,代理会关闭通道。关闭的原因包含在cause中。cause是在版本1.4中添加的。

每个RabbitTemplate仅支持一个ConfirmCallback

当Rabbit模板发送操作完成时,通道将关闭。当连接工厂缓存已满时,这将阻止接收确认或返回(当缓存中有空间时,通道不会物理关闭,并且返回和确认将正常进行)。当缓存已满时,框架最多延迟五秒钟关闭,以便为接收确认和返回留出时间。使用确认时,在收到最后一个确认后关闭通道。仅使用返回时,通道将保持打开状态五秒钟。我们通常建议将连接工厂的channelCacheSize设置为足够大的值,以便发布消息的通道返回到缓存而不是关闭。您可以使用RabbitMQ管理插件监控通道使用情况。如果您看到通道快速打开和关闭,则应考虑增加缓存大小以减少服务器上的开销。
在版本2.1之前,启用发布者确认的通道在收到确认之前会返回到缓存。某些其他进程可能会检出通道并执行导致通道关闭的操作,例如将消息发布到不存在的交换机。这可能导致确认丢失。版本2.1及更高版本不再在确认未完成时将通道返回到缓存。RabbitTemplate在每次操作后对通道执行逻辑close()。通常,这意味着一次只有一个确认在通道上未完成。
从版本2.2开始,回调在连接工厂的executor线程之一上调用。这是为了避免如果您在回调中执行Rabbit操作,则可能发生死锁。在以前的版本中,回调直接在amqp-client连接I/O线程上调用;如果您执行某些RPC操作(例如打开新通道),则会导致死锁,因为I/O线程阻塞等待结果,但结果需要由I/O线程本身处理。对于这些版本,有必要在回调中将工作(例如发送消息)传递给另一个线程。由于框架现在将回调调用传递给执行程序,因此这不再需要。
只要返回回调在60秒或更短时间内执行,接收确认之前的返回消息的保证仍然得到维护。确认计划在返回回调退出或60秒后传递,以先发生者为准。

CorrelationData对象有一个CompletableFuture,您可以使用它来获取结果,而不是在模板上使用ConfirmCallback。以下示例显示了如何配置CorrelationData实例

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

由于它是CompletableFuture<Confirm>,因此您可以在准备就绪时获取结果,或者使用whenComplete()进行异步回调。Confirm对象是一个简单的bean,具有两个属性:ackreason(对于nack实例)。对于代理生成的nack实例,不会填充原因。它填充了由框架生成的nack实例(例如,在ack实例未完成时关闭连接)。

此外,当启用确认和返回时,如果无法将CorrelationData return属性路由到任何队列,则会填充它。保证在使用ack设置未来之前设置返回的消息属性。CorrelationData.getReturn()返回一个ReturnMessage,其属性为

  • message(返回的消息)

  • replyCode

  • replyText

  • exchange

  • routingKey

另请参阅范围操作,了解用于等待发布者确认的更简单机制。

范围操作

通常,使用模板时,会从缓存中检出(或创建)一个Channel,用于操作,并将其返回到缓存以供重用。在多线程环境中,无法保证下一个操作使用相同的通道。但是,有时您可能希望对通道的使用有更多控制权,并确保许多操作都在同一通道上执行。

从版本2.0开始,提供了一种名为invoke的新方法,并带有一个OperationsCallback。在回调范围内和提供的RabbitOperations参数上执行的任何操作都使用相同的专用Channel,该通道将在结束时关闭(不会返回到缓存)。如果通道是PublisherCallbackChannel,则在收到所有确认后将其返回到缓存(请参阅相关发布者确认和返回)。

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

您可能需要此功能的一个示例是,如果您希望在底层Channel上使用waitForConfirms()方法。此方法以前未由Spring API公开,因为通常会缓存和共享通道,如前所述。RabbitTemplate现在提供waitForConfirms(long timeout)waitForConfirmsOrDie(long timeout),它们委托给在OperationsCallback范围内使用的专用通道。出于显而易见的原因,这些方法不能在该范围之外使用。

请注意,其他地方提供了允许您将确认与请求关联的更高级别的抽象(请参阅相关发布者确认和返回)。如果您只想等到代理确认传递,则可以使用以下示例中显示的技术

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

如果您希望RabbitAdmin操作在OperationsCallback范围内在同一通道上调用,则必须使用用于invoke操作的相同RabbitTemplate构造管理员。

如果模板操作已经在现有事务范围内执行,则前面的讨论就没有意义了,例如,在事务监听器容器线程上运行并在事务模板上执行操作时。在这种情况下,操作将在该通道上执行,并在线程返回到容器时提交。在这种情况下,无需使用invoke

以这种方式使用确认时,实际上不需要为将确认与请求关联而设置的大部分基础结构(除非也启用了返回)。从版本2.2开始,连接工厂支持一个名为publisherConfirmType的新属性。当将其设置为ConfirmType.SIMPLE时,将避免基础结构,并且确认处理可以更有效率。

此外,RabbitTemplate在发送的消息MessageProperties中设置publisherSequenceNumber属性。如果您希望检查(或记录或以其他方式使用)特定确认,则可以使用重载的invoke方法,如下例所示

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
这些ConfirmCallback对象(用于acknack实例)是Rabbit客户端回调,而不是模板回调。

以下示例记录acknack实例

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
范围操作绑定到线程。有关多线程环境中严格排序的讨论,请参阅多线程环境中的严格消息排序

多线程环境中的严格消息排序

范围操作中的讨论仅适用于在同一线程上执行操作的情况。

考虑以下情况

  • thread-1将消息发送到队列并将工作传递给thread-2

  • thread-2将消息发送到同一队列

由于RabbitMQ的异步特性以及缓存通道的使用,无法确定是否将使用相同的通道,因此消息到达队列的顺序无法保证。(在大多数情况下,它们将按顺序到达,但无序传递的概率不为零)。要解决此用例,您可以使用大小为1的有界通道缓存(以及channelCheckoutTimeout)来确保消息始终发布在同一通道上,并且顺序将得到保证。为此,如果您将连接工厂用于其他用途(例如消费者),则应为模板使用专用的连接工厂,或者将模板配置为使用嵌入在主连接工厂中的发布者连接工厂(请参阅使用单独的连接)。

这最好用一个简单的Spring Boot应用程序来说明

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

即使发布是在两个不同的线程上执行的,它们也将使用相同的通道,因为缓存限制为单个通道。

从版本2.3.7开始,ThreadChannelConnectionFactory支持使用prepareContextSwitchswitchContext方法将线程的通道传输到另一个线程。第一个方法返回一个上下文,该上下文传递给调用第二个方法的第二个线程。一个线程可以将其非事务性通道或事务性通道(或两者之一)绑定到它;除非您使用两个连接工厂,否则您不能单独传输它们。示例如下

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
调用prepareSwitchContext后,如果当前线程执行更多操作,则将在新通道上执行这些操作。当不再需要线程绑定通道时,关闭它非常重要。

消息传递集成

从 1.4 版本开始,RabbitMessagingTemplate(构建在 RabbitTemplate 之上)提供了与 Spring 框架消息抽象的集成——即 org.springframework.messaging.Message。这使您可以使用 spring-messagingMessage<?> 抽象来发送和接收消息。此抽象被其他 Spring 项目使用,例如 Spring Integration 和 Spring 的 STOMP 支持。涉及两个消息转换器:一个用于在 spring-messaging 的 Message<?> 和 Spring AMQP 的 Message 抽象之间进行转换,另一个用于在 Spring AMQP 的 Message 抽象和底层 RabbitMQ 客户端库所需的格式之间进行转换。默认情况下,消息有效负载由提供的 RabbitTemplate 实例的消息转换器转换。或者,您可以注入一个自定义的 MessagingMessageConverter 和其他一些有效负载转换器,如下例所示。

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

已验证的用户 ID

从 1.6 版本开始,模板现在支持 user-id-expression(使用 Java 配置时为 userIdExpression)。如果发送消息,则在评估此表达式后设置用户 ID 属性(如果尚未设置)。评估的根对象是要发送的消息。

以下示例演示了如何使用 user-id-expression 属性。

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一个示例是一个文字表达式。第二个从应用程序上下文中的连接工厂 Bean 获取 username 属性。

使用单独的连接

从 2.0.2 版本开始,您可以将 usePublisherConnection 属性设置为 true,以便在可能的情况下使用与侦听器容器使用的连接不同的连接。这样做是为了避免在生产者因任何原因阻塞时消费者被阻塞。连接工厂为此目的维护第二个内部连接工厂;默认情况下,它与主工厂类型相同,但如果您希望使用不同的工厂类型进行发布,则可以显式设置。如果 Rabbit 模板在侦听器容器启动的事务中运行,则无论此设置如何,都会使用容器的通道。

通常,您不应将 RabbitAdmin 与设置为 true 的模板一起使用。使用带连接工厂的 RabbitAdmin 构造函数。如果您使用带模板的其他构造函数,请确保模板的属性为 false。这是因为,通常,管理员用于为侦听器容器声明队列。使用将属性设置为 true 的模板意味着独占队列(例如 AnonymousQueue)将在与侦听器容器使用的连接不同的连接上声明。在这种情况下,队列无法被容器使用。