配置代理

AMQP 规范描述了如何使用该协议在代理上配置队列、交换机和绑定。这些操作(从 0.8 规范及更高版本移植)存在于 `org.springframework.amqp.core` 包中的 `AmqpAdmin` 接口中。该类的 RabbitMQ 实现是 `org.springframework.amqp.rabbit.core` 包中的 `RabbitAdmin`。

`AmqpAdmin` 接口基于使用 Spring AMQP 域抽象,如下所示

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    void deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

}

另请参见 作用域操作

`getQueueProperties()` 方法返回有关队列的一些有限信息(消息计数和消费者计数)。返回的属性的键作为常量在 `RabbitTemplate` 中可用(`QUEUE_NAME`、`QUEUE_MESSAGE_COUNT` 和 `QUEUE_CONSUMER_COUNT`)。RabbitMQ REST API 在 `QueueInfo` 对象中提供了更多信息。

无参数的 `declareQueue()` 方法在代理上定义一个队列,该队列的名称是自动生成的。此自动生成队列的附加属性为 `exclusive=true`、`autoDelete=true` 和 `durable=false`。

`declareQueue(Queue queue)` 方法接受一个 `Queue` 对象并返回已声明队列的名称。如果提供的 `Queue` 的 `name` 属性为空 `String`,则代理会使用生成的名称声明队列。该名称将返回给调用者。该名称还将添加到 `Queue` 的 `actualName` 属性中。您只能通过直接调用 `RabbitAdmin` 来以编程方式使用此功能。当在应用程序上下文中以声明方式定义队列时,通过管理员使用自动声明,您可以将名称属性设置为 `""`(空字符串)。然后代理创建名称。从版本 2.1 开始,监听器容器可以使用此类型的队列。有关更多信息,请参见 容器和代理命名队列

这与 `AnonymousQueue` 形成对比,在 `AnonymousQueue` 中,框架会生成一个唯一的(`UUID`)名称并将 `durable` 设置为 `false`,并将 `exclusive`、`autoDelete` 设置为 `true`。具有空(或缺失)`name` 属性的 `<rabbit:queue/>` 始终创建一个 `AnonymousQueue`。

请参见 `AnonymousQueue` 以了解为什么 `AnonymousQueue` 比代理生成的队列名称更可取,以及如何控制名称的格式。从版本 2.1 开始,匿名队列的声明使用参数 `Queue.X_QUEUE_LEADER_LOCATOR` 设置为 `client-local`(默认情况下)。这确保队列在应用程序连接到的节点上声明。声明式队列必须具有固定名称,因为它们可能在上下文中其他地方被引用,例如以下示例中所示的监听器

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

此接口的 RabbitMQ 实现是 `RabbitAdmin`,当使用 Spring XML 配置时,类似于以下示例

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory 的缓存模式为 CHANNEL(默认值)时,RabbitAdmin 实现会自动延迟声明在同一个 ApplicationContext 中声明的队列、交换机和绑定。这些组件将在打开与代理的 Connection 时立即声明。一些命名空间功能使这非常方便——例如,在 Stocks 示例应用程序中,我们有以下内容

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的示例中,我们使用匿名队列(实际上,在内部,只是由框架而不是代理生成的名称的队列),并通过 ID 引用它们。我们也可以声明具有显式名称的队列,这些名称也用作其在上下文中的 bean 定义的标识符。以下示例配置了一个具有显式名称的队列

<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供 idname 属性。这使您可以通过与队列名称无关的 ID 来引用队列(例如,在绑定中)。它还允许使用标准的 Spring 功能(例如,队列名称的属性占位符和 SpEL 表达式)。当您使用名称作为 bean 标识符时,这些功能不可用。

队列可以使用附加参数进行配置——例如,x-message-ttl。当您使用命名空间支持时,它们以参数名称/参数值对的 Map 形式提供,这些对是使用 <rabbit:queue-arguments> 元素定义的。以下示例展示了如何执行此操作

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

默认情况下,假设参数是字符串。对于其他类型的参数,您必须提供类型。以下示例展示了如何指定类型

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当提供混合类型的参数时,您必须为每个条目元素提供类型。以下示例展示了如何执行此操作

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

使用 Spring Framework 3.2 及更高版本,可以更简洁地声明,如下所示

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当您使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR 参数作为 Queue 类上的 setLeaderLocator() 方法的一级属性得到支持。从 2.1 版本开始,匿名队列的声明默认情况下将此属性设置为 client-local。这确保队列是在应用程序连接到的节点上声明的。

RabbitMQ 代理不允许声明具有不匹配参数的队列。例如,如果一个 queue 已经存在,但没有 time to live 参数,而您尝试声明它(例如)key="x-message-ttl" value="100",则会抛出异常。

默认情况下,RabbitAdmin 在发生任何异常时立即停止处理所有声明。这可能会导致下游问题,例如监听器容器无法初始化,因为另一个队列(在错误队列之后定义)没有声明。

可以通过将 ignore-declaration-exceptions 属性设置为 RabbitAdmin 实例上的 true 来修改此行为。此选项指示 RabbitAdmin 记录异常并继续声明其他元素。使用 Java 配置 RabbitAdmin 时,此属性称为 ignoreDeclarationExceptions。这是一个全局设置,适用于所有元素。队列、交换机和绑定具有类似的属性,仅适用于这些元素。

在 1.6 版本之前,此属性仅在通道上发生 IOException 时生效,例如当当前属性和所需属性不匹配时。现在,此属性对任何异常都有效,包括 TimeoutException 和其他异常。

此外,任何声明异常都会导致发布 DeclarationExceptionEvent,它是一个 ApplicationEvent,可以被上下文中的任何 ApplicationListener 使用。该事件包含对管理员、正在声明的元素和 Throwable 的引用。

Headers 交换机

从 1.3 版本开始,您可以配置 HeadersExchange 以匹配多个头。您还可以指定必须匹配任何头还是所有头。以下示例显示了如何执行此操作

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

从 1.6 版本开始,您可以使用 internal 标志(默认为 false)配置 Exchanges,并且这样的 Exchange 通过 RabbitAdmin(如果应用程序上下文中存在)在代理上正确配置。如果交换机的 internal 标志为 true,RabbitMQ 不允许客户端使用该交换机。这对于死信交换机或交换机到交换机绑定很有用,在这种情况下,您不希望发布者直接使用交换机。

要了解如何使用 Java 配置 AMQP 基础设施,请查看 Stock 示例应用程序,其中包含 @ConfigurationAbstractStockRabbitConfiguration,该类又包含 RabbitClientConfigurationRabbitServerConfiguration 子类。以下清单显示了 AbstractStockRabbitConfiguration 的代码。

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在 Stock 应用程序中,服务器是使用以下 @Configuration 类配置的

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

这是 @Configuration 类整个继承链的结束。最终结果是在应用程序启动时向代理声明 TopicExchangeQueue。服务器配置中没有将 TopicExchange 绑定到队列,因为这是在客户端应用程序中完成的。但是,股票请求队列会自动绑定到 AMQP 默认交换机。此行为由规范定义。

客户端 @Configuration 类更有趣。它的声明如下

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

客户端通过 AmqpAdmin 上的 declareQueue() 方法声明另一个队列。它将该队列绑定到市场数据交换机,并使用在属性文件中外部化的路由模式。

队列和交换机的构建器 API

版本 1.6 引入了一个方便的流畅 API,用于在使用 Java 配置时配置 QueueExchange 对象。以下示例展示了如何使用它

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

从版本 2.0 开始,ExchangeBuilder 现在默认创建持久交换机,以与单个 AbstractExchange 类上的简单构造函数保持一致。要使用构建器创建非持久交换机,请在调用 .build() 之前使用 .durable(false)。不再提供不带参数的 durable() 方法。

版本 2.2 引入了流畅的 API 来添加“众所周知”的交换机和队列参数……

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

声明交换机、队列和绑定的集合

您可以将 Declarable 对象(QueueExchangeBinding)的集合包装在 Declarables 对象中。RabbitAdmin 会在应用程序上下文中检测到此类 bean(以及离散的 Declarable bean),并在每次建立连接时(最初以及连接失败后)在代理上声明所包含的对象。以下示例展示了如何执行此操作

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}
在 2.1 之前的版本中,您可以通过定义类型为 Collection<Declarable> 的 bean 来声明多个 Declarable 实例。在某些情况下,这会导致不良副作用,因为管理员必须遍历所有 Collection<?> bean。

版本 2.2 添加了 getDeclarablesByType 方法到 Declarables;这可以用作便利,例如,在声明侦听器容器 bean 时。

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

条件声明

默认情况下,所有队列、交换机和绑定都由应用程序上下文中的所有 RabbitAdmin 实例声明(假设它们具有 auto-startup="true")。

从 2.1.9 版本开始,RabbitAdmin 有一个新的属性 explicitDeclarationsOnly(默认值为 false);当将其设置为 true 时,管理员将只声明明确配置为由该管理员声明的 bean。

从 1.2 版本开始,您可以有条件地声明这些元素。当应用程序连接到多个代理并且需要指定应在哪些代理上声明特定元素时,这特别有用。

表示这些元素的类实现 Declarable,它有两个方法:shouldDeclare()getDeclaringAdmins()RabbitAdmin 使用这些方法来确定特定实例是否应该在其 Connection 上实际处理声明。

这些属性在命名空间中可用作属性,如下面的示例所示

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare 属性为 true,如果未提供 declared-by(或为空),则所有 RabbitAdmin 实例都会声明该对象(只要管理员的 auto-startup 属性为 true(默认值)并且管理员的 explicit-declarations-only 属性为 false)。

类似地,您可以使用基于 Java 的 @Configuration 来实现相同的效果。在以下示例中,组件由 admin1 声明,但不由 admin2 声明

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

关于 idname 属性的说明

<rabbit:queue/><rabbit:exchange/> 元素上的 name 属性反映了代理中实体的名称。对于队列,如果省略 name,则会创建一个匿名队列(参见 AnonymousQueue)。

在 2.0 之前的版本中,name 也被注册为 bean 名称别名(类似于 <bean/> 元素上的 name)。

这导致了两个问题

  • 它阻止了具有相同名称的队列和交换机的声明。

  • 如果别名包含 SpEL 表达式(#{…​}),则不会解析别名。

从 2.0 版本开始,如果您使用 idname 属性声明其中一个元素,则 name 将不再被声明为 bean 名称别名。如果您希望声明具有相同 name 的队列和交换机,则必须提供 id

如果元素只有 name 属性,则不会有任何变化。仍然可以通过 name 引用 bean,例如在绑定声明中。但是,如果名称包含 SpEL,则仍然无法引用它,您必须提供 id 用于引用目的。

AnonymousQueue

通常,当您需要一个唯一命名、排他性的、自动删除的队列时,我们建议您使用 AnonymousQueue 而不是代理定义的队列名称(使用 "" 作为 Queue 名称会导致代理生成队列名称)。

这是因为

  1. 队列实际上是在建立与代理的连接时声明的。这比创建和连接 bean 要晚得多。使用队列的 bean 需要知道它的名称。实际上,代理可能在应用程序启动时甚至没有运行。

  2. 如果由于某种原因与代理的连接丢失,管理员会使用相同的名称重新声明AnonymousQueue。如果我们使用代理声明的队列,队列名称将发生变化。

您可以控制AnonymousQueue实例使用的队列名称的格式。

默认情况下,队列名称以spring.gen-为前缀,后跟UUID的 base64 表示形式,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g

您可以在构造函数参数中提供AnonymousQueue.NamingStrategy实现。以下示例展示了如何做到这一点

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一个 bean 生成一个以spring.gen-为前缀的队列名称,后跟UUID的 base64 表示形式,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二个 bean 生成一个以something-为前缀的队列名称,后跟UUID的 base64 表示形式。第三个 bean 生成一个仅使用 UUID(不进行 base64 转换)的名称,例如:f20c818a-006b-4416-bf91-643590fedb0e

base64 编码使用 RFC 4648 中的“URL 和文件名安全字母”。尾随填充字符(=)将被删除。

您可以提供自己的命名策略,您可以在其中将其他信息(例如应用程序名称或客户端主机)包含在队列名称中。

您可以在使用 XML 配置时指定命名策略。naming-strategy属性存在于<rabbit:queue>元素上,用于实现AnonymousQueue.NamingStrategy的 bean 引用。以下示例展示了如何在各种方式中指定命名策略

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一个示例创建诸如spring.gen-MRBv9sqISkuCiPfOYfpo4g之类的名称。第二个示例创建具有 UUID 字符串表示形式的名称。第三个示例创建诸如custom.gen-MRBv9sqISkuCiPfOYfpo4g之类的名称。

您也可以提供自己的命名策略 bean。

从 2.1 版本开始,匿名队列在声明时默认情况下将参数Queue.X_QUEUE_LEADER_LOCATOR设置为client-local。这确保队列在应用程序连接到的节点上声明。您可以通过在构造实例后调用queue.setLeaderLocator(null)来恢复到之前的行为。

恢复自动删除声明

通常,RabbitAdmin (s) 仅恢复在应用程序上下文中声明为 bean 的队列/交换机/绑定;如果任何此类声明是自动删除的,则在连接丢失时,代理将删除它们。当连接重新建立时,管理员将重新声明这些实体。通常,通过调用 admin.declareQueue(…​)admin.declareExchange(…​)admin.declareBinding(…​) 创建的实体不会被恢复。

从版本 2.4 开始,管理员有一个新的属性 redeclareManualDeclarations;当设置为 true 时,管理员除了应用程序上下文中的 bean 之外,还会恢复这些实体。

如果调用 deleteQueue(…​)deleteExchange(…​)removeBinding(…​),则不会执行单个声明的恢复。当队列和交换机被删除时,关联的绑定将从可恢复实体中删除。

最后,调用 resetAllManualDeclarations() 将阻止恢复任何先前声明的实体。