AMQP

高级消息队列协议 (AMQP) 是一种平台无关的、线级协议,用于面向消息的中间件。Spring AMQP 项目将 Spring 核心概念应用于基于 AMQP 的消息解决方案的开发。Spring Boot 提供了多种便利功能,可通过 RabbitMQ 使用 AMQP,包括 spring-boot-starter

RabbitMQ 支持

RabbitMQ 是一款基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。

RabbitMQ 配置由 `spring.rabbitmq.*` 中的外部配置属性控制。例如,你可以在 `application.properties` 中声明以下部分

  • 属性

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,你可以使用 `addresses` 属性配置相同的连接

  • 属性

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
以这种方式指定地址时,将忽略 `host` 和 `port` 属性。如果地址使用 `amqps` 协议,则会自动启用 SSL 支持。

请参阅 RabbitProperties 了解有关更多受支持的基于属性的配置选项。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的较低级别详细信息,请定义一个 ConnectionFactoryCustomizer bean。

如果上下文中存在 ConnectionNameStrategy bean,它将自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。

要对 RabbitTemplate 进行应用程序范围的附加自定义,请使用 RabbitTemplateCustomizer bean。

请参阅 了解 AMQP(RabbitMQ 使用的协议) 了解更多详细信息。

发送消息

Spring 的 AmqpTemplateAmqpAdmin 是自动配置的,你可以将它们直接自动装配到自己的 bean 中,如下例所示

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate 可以以类似的方式注入。如果定义了 MessageConverter bean,它会自动与自动配置的 AmqpTemplate 关联。

如有必要,任何定义为 bean 的 org.springframework.amqp.core.Queue 都会自动用于在 RabbitMQ 实例上声明相应的队列。

要重试操作,可以在 AmqpTemplate 上启用重试(例如,在代理连接丢失的情况下)

  • 属性

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

默认情况下禁用重试。你还可以通过声明 RabbitRetryTemplateCustomizer bean 以编程方式自定义 RetryTemplate

如果您需要创建更多 RabbitTemplate 实例,或者您想覆盖默认值,Spring Boot 提供了一个 RabbitTemplateConfigurer bean,您可以使用它来使用与自动配置中使用的工厂相同的设置初始化 RabbitTemplate

向流发送消息

要向特定流发送消息,请指定流的名称,如下例所示

  • 属性

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果定义了 MessageConverterStreamMessageConverterProducerCustomizer bean,它会自动关联到自动配置的 RabbitStreamTemplate

如果您需要创建更多 RabbitStreamTemplate 实例,或者您想覆盖默认值,Spring Boot 提供了一个 RabbitStreamTemplateConfigurer bean,您可以使用它来使用与自动配置中使用的工厂相同的设置初始化 RabbitStreamTemplate

接收消息

当 Rabbit 基础设施存在时,可以使用 @RabbitListener 注解任何 bean 来创建侦听器端点。如果没有定义 RabbitListenerContainerFactory,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory,您可以使用 spring.rabbitmq.listener.type 属性切换到直接容器。如果定义了 MessageConverterMessageRecoverer bean,它会自动关联到默认工厂。

以下示例组件在 someQueue 队列上创建了一个侦听器端点

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
有关更多详细信息,请参见 @EnableRabbitJavadoc

如果您需要创建更多 RabbitListenerContainerFactory 实例,或者您想覆盖默认值,Spring Boot 提供了一个 SimpleRabbitListenerContainerFactoryConfigurer 和一个 DirectRabbitListenerContainerFactoryConfigurer,您可以使用它们来使用与自动配置中使用的工厂相同的设置初始化 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory

您选择哪种容器类型并不重要。这两个 bean 由自动配置公开。

例如,以下配置类公开另一个使用特定 MessageConverter 的工厂

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory? {
		return ...
	}

}

然后,您可以在任何 @RabbitListener 注解的方法中使用工厂,如下所示

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

您可以启用重试来处理侦听器抛出异常的情况。默认情况下,使用 RejectAndDontRequeueRecoverer,但您可以定义自己的 MessageRecoverer。当重试用尽时,消息会被拒绝,并且如果代理配置为这样做,则会被丢弃或路由到死信交换。默认情况下,禁用重试。您还可以通过声明 RabbitRetryTemplateCustomizer bean 以编程方式自定义 RetryTemplate

默认情况下,如果禁用重试且侦听器抛出异常,则会无限期地重试传递。您可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false,以便尝试零次重新传递,或抛出 AmqpRejectAndDontRequeueException 以指示应拒绝该消息。当启用重试且达到最大传递尝试次数时,会使用后一种机制。