配置代理

AMQP 规范描述了如何使用该协议配置代理上的队列、交换机和绑定。这些操作(从 0.8 版及更高版本开始可移植)存在于 org.springframework.amqp.core 包中的 AmqpAdmin 接口中。该类的 RabbitMQ 实现是 org.springframework.amqp.rabbit.core 包中的 RabbitAdmin

AmqpAdmin 接口基于使用 Spring AMQP 领域抽象,如下所示

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    void deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

}

另请参阅 作用域操作

getQueueProperties() 方法返回有关队列的一些有限信息(消息数和消费者数)。返回的属性的键作为 RabbitTemplate 中的常量可用(QUEUE_NAMEQUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT)。RabbitMQ REST APIQueueInfo 对象中提供了更多信息。

无参数的 declareQueue() 方法在代理上定义一个队列,其名称会自动生成。此自动生成队列的其他属性为 exclusive=trueautoDelete=truedurable=false

declareQueue(Queue queue) 方法接收一个 Queue 对象并返回已声明队列的名称。如果提供的 Queuename 属性为空 String,则代理会使用生成的名称声明队列。该名称将返回给调用方。该名称还会添加到 QueueactualName 属性中。只有通过直接调用 RabbitAdmin 才能以编程方式使用此功能。当在应用程序上下文中声明性地定义队列时,通过管理员使用自动声明,可以将 name 属性设置为 ""(空字符串)。然后代理创建名称。从 2.1 版开始,监听器容器可以使用此类型的队列。有关更多信息,请参阅 容器和代理命名队列

这与 AnonymousQueue 形成对比,在 AnonymousQueue 中,框架会生成一个唯一的(UUID)名称,并将 durable 设置为 false,并将 exclusiveautoDelete 设置为 true。具有空(或缺少)name 属性的 <rabbit:queue/> 始终创建 AnonymousQueue

请参阅 AnonymousQueue 以了解为什么 AnonymousQueue 比代理生成的队列名称更受青睐,以及如何控制名称的格式。从 2.1 版开始,匿名队列在声明时默认情况下将参数 Queue.X_QUEUE_LEADER_LOCATOR 设置为 client-local。这确保队列在应用程序连接到的节点上声明。声明性队列必须具有固定名称,因为它们可能在上下文的其他地方被引用,例如以下示例中所示的监听器

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

此接口的 RabbitMQ 实现是 RabbitAdmin,当使用 Spring XML 配置时,类似于以下示例

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory的缓存模式为CHANNEL(默认值)时,RabbitAdmin实现会自动延迟声明在同一ApplicationContext中声明的队列、交换机和绑定。这些组件会在打开到代理的Connection时立即声明。一些命名空间特性使这非常方便——例如,在Stocks示例应用程序中,我们有以下内容

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的示例中,我们使用匿名队列(实际上,在内部,只是框架生成的队列名称,而不是代理生成的)并通过ID引用它们。我们也可以声明具有显式名称的队列,这些名称也充当上下文中其bean定义的标识符。以下示例配置了一个具有显式名称的队列

<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供idname属性。这允许您通过与队列名称无关的ID来引用队列(例如,在绑定中)。它还允许使用标准的Spring特性(例如,队列名称的属性占位符和SpEL表达式)。当您使用名称作为bean标识符时,这些特性不可用。

队列可以配置其他参数——例如,x-message-ttl。当您使用命名空间支持时,它们以参数名/参数值对的Map形式提供,这些对通过使用<rabbit:queue-arguments>元素定义。以下示例显示了如何执行此操作

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

默认情况下,假定参数为字符串。对于其他类型的参数,您必须提供类型。以下示例显示了如何指定类型

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当提供混合类型的参数时,必须为每个条目元素提供类型。以下示例显示了如何执行此操作

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

使用Spring Framework 3.2及更高版本,可以更简洁地声明如下

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当您使用Java配置时,Queue.X_QUEUE_LEADER_LOCATOR参数通过Queue类上的setLeaderLocator()方法作为一等属性支持。从版本2.1开始,默认情况下,匿名队列在声明时将此属性设置为client-local。这确保队列在应用程序连接到的节点上声明。

RabbitMQ代理不允许声明参数不匹配的队列。例如,如果一个queue已经存在,并且没有time to live参数,而您尝试声明它(例如)key="x-message-ttl" value="100",则会抛出异常。

默认情况下,RabbitAdmin在发生任何异常时立即停止处理所有声明。这可能会导致下游问题,例如侦听器容器无法初始化,因为另一个(在错误队列之后定义的)队列未声明。

可以通过将RabbitAdmin实例上的ignore-declaration-exceptions属性设置为true来修改此行为。此选项指示RabbitAdmin记录异常并继续声明其他元素。当使用Java配置RabbitAdmin时,此属性称为ignoreDeclarationExceptions。这是一个全局设置,适用于所有元素。队列、交换机和绑定具有类似的属性,仅适用于这些元素。

在1.6版本之前,此属性仅在通道上发生IOException时才会生效,例如,当当前属性和所需属性不匹配时。现在,此属性在任何异常(包括TimeoutException和其他异常)上都会生效。

此外,任何声明异常都会导致发布DeclarationExceptionEvent,这是一个ApplicationEvent,上下文中的任何ApplicationListener都可以使用它。该事件包含对admin、正在声明的元素和Throwable的引用。

Headers交换机

从版本1.3开始,您可以配置HeadersExchange以匹配多个头。您还可以指定是否必须匹配任何或所有头。以下示例显示了如何执行此操作

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

从版本1.6开始,您可以使用internal标志(默认为false)配置Exchanges,并且这样的Exchange通过RabbitAdmin在代理上正确配置(如果应用程序上下文中存在)。如果交换机的internal标志为true,则RabbitMQ不允许客户端使用该交换机。这对于死信交换机或交换机到交换机的绑定很有用,在这些情况下,您不希望发布者直接使用该交换机。

要了解如何使用Java配置AMQP基础结构,请查看Stock示例应用程序,其中有一个@ConfigurationAbstractStockRabbitConfiguration,该类又具有RabbitClientConfigurationRabbitServerConfiguration子类。以下清单显示了AbstractStockRabbitConfiguration的代码

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在Stock应用程序中,服务器使用以下@Configuration类进行配置

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

这是@Configuration类整个继承链的末尾。最终结果是在应用程序启动时将TopicExchangeQueue声明到代理。服务器配置中没有将TopicExchange绑定到队列,因为这是在客户端应用程序中完成的。但是,股票请求队列会自动绑定到AMQP默认交换机。此行为由规范定义。

客户端@Configuration类更有趣一些。其声明如下

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

客户端通过AmqpAdmin上的declareQueue()方法声明另一个队列。它将该队列绑定到市场数据交换机,并使用在属性文件中外部化的路由模式。

队列和交换机的构建器API

版本1.6引入了方便的流畅API,用于在使用Java配置时配置QueueExchange对象。以下示例显示了如何使用它

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

从版本2.0开始,ExchangeBuilder现在默认创建持久交换机,以与各个AbstractExchange类上的简单构造函数保持一致。要使用构建器创建非持久交换机,请在调用.build()之前使用.durable(false)。不再提供不带参数的durable()方法。

版本2.2引入了流畅的API来添加“众所周知”的交换机和队列参数……

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

声明交换机、队列和绑定的集合

您可以将Declarable对象(QueueExchangeBinding)的集合包装在Declarables对象中。RabbitAdmin在应用程序上下文中检测到此类bean(以及离散的Declarable bean),并在每次建立连接(最初和连接失败后)时在代理上声明包含的对象。以下示例显示了如何执行此操作

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}
在2.1之前的版本中,您可以通过定义类型为Collection<Declarable>的bean来声明多个Declarable实例。在某些情况下,这可能会导致不良的副作用,因为管理员必须遍历所有Collection<?> bean。

版本2.2向Declarables添加了getDeclarablesByType方法;这可以用作便利,例如,在声明侦听器容器bean时。

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

条件声明

默认情况下,所有队列、交换机和绑定都由应用程序上下文中的所有RabbitAdmin实例(假设它们具有auto-startup="true")声明。

从版本2.1.9开始,RabbitAdmin有一个新的属性explicitDeclarationsOnly(默认为false);当将其设置为true时,管理员将仅声明显式配置为由该管理员声明的bean。

从1.2版本开始,您可以有条件地声明这些元素。当应用程序连接到多个代理并需要指定应在哪些代理上声明特定元素时,这尤其有用。

表示这些元素的类实现了Declarable,它有两个方法:shouldDeclare()getDeclaringAdmins()RabbitAdmin使用这些方法来确定是否应在其实际的Connection上处理特定实例的声明。

这些属性在命名空间中作为属性可用,如下例所示

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare属性为true,如果未提供declared-by(或为空),则所有RabbitAdmin实例都会声明该对象(只要admin的auto-startup属性为true(默认值),并且admin的explicit-declarations-only属性为false)。

类似地,您可以使用基于Java的@Configuration来实现相同的效果。在以下示例中,组件由admin1声明,但不由admin2声明

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

关于idname属性的说明

<rabbit:queue/><rabbit:exchange/>元素上的name属性反映了代理中实体的名称。对于队列,如果省略name,则会创建一个匿名队列(请参阅AnonymousQueue)。

在2.0之前的版本中,name也注册为bean名称别名(类似于<bean/>元素上的name)。

这导致了两个问题

  • 它阻止了声明具有相同名称的队列和交换机。

  • 如果别名包含SpEL表达式(#{…​}),则不会解析该别名。

从版本2.0开始,如果您使用idname属性声明这些元素之一,则不再将名称声明为bean名称别名。如果您希望声明具有相同name的队列和交换机,则必须提供id

如果元素仅具有name属性,则没有变化。仍然可以通过name引用bean——例如,在绑定声明中。但是,如果名称包含SpEL,则您仍然无法引用它——您必须提供id以供引用。

AnonymousQueue

通常,当您需要一个唯一命名、排他的、自动删除的队列时,我们建议您使用AnonymousQueue而不是代理定义的队列名称(使用""作为Queue名称会导致代理生成队列名称)。

这是因为

  1. 队列实际上是在建立到代理的连接时声明的。这早于bean创建和连接的时间。使用该队列的bean需要知道其名称。实际上,代理在应用程序启动时甚至可能尚未运行。

  2. 如果由于某种原因与代理的连接丢失,则admin会使用相同的名称重新声明AnonymousQueue。如果我们使用代理声明的队列,则队列名称会更改。

您可以控制AnonymousQueue实例使用的队列名称的格式。

默认情况下,队列名称以spring.gen-开头,后跟UUID的Base64表示形式,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g

您可以在构造函数参数中提供一个AnonymousQueue.NamingStrategy实现。以下示例展示了如何操作

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一个 Bean 生成一个以spring.gen-开头,后跟UUID的Base64表示形式的队列名称,例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二个 Bean 生成一个以something-开头,后跟UUID的Base64表示形式的队列名称。第三个 Bean 使用仅 UUID(无 Base64 转换)生成名称,例如:f20c818a-006b-4416-bf91-643590fedb0e

Base64 编码使用 RFC 4648 中的“URL 和文件名安全字母表”。尾随填充字符(=)将被移除。

您可以提供自己的命名策略,从而可以在队列名称中包含其他信息(例如应用程序名称或客户端主机)。

当您使用 XML 配置时,可以指定命名策略。naming-strategy 属性存在于<rabbit:queue>元素上,用于实现AnonymousQueue.NamingStrategy的 Bean 引用。以下示例展示了如何以各种方式指定命名策略

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一个示例创建诸如spring.gen-MRBv9sqISkuCiPfOYfpo4g之类的名称。第二个示例使用 UUID 的字符串表示形式创建名称。第三个示例创建诸如custom.gen-MRBv9sqISkuCiPfOYfpo4g之类的名称。

您还可以提供自己的命名策略 Bean。

从 2.1 版开始,匿名队列在声明时默认将参数Queue.X_QUEUE_LEADER_LOCATOR设置为client-local。这确保队列在应用程序连接到的节点上声明。您可以通过在构建实例后调用queue.setLeaderLocator(null)来恢复到之前的行为。

恢复自动删除声明

通常,RabbitAdmin(s) 仅恢复在应用程序上下文中声明为 Bean 的队列/交换机/绑定;如果任何此类声明是自动删除的,则如果连接丢失,它们将被代理删除。当连接重新建立时,管理员将重新声明这些实体。通常,通过调用admin.declareQueue(…​)admin.declareExchange(…​)admin.declareBinding(…​)创建的实体将不会被恢复。

从 2.4 版开始,管理员具有一个新的属性redeclareManualDeclarations;当设置为true时,管理员除了应用程序上下文中的 Bean 之外,还会恢复这些实体。

如果调用deleteQueue(…​)deleteExchange(…​)removeBinding(…​),则不会执行单个声明的恢复。当队列和交换机被删除时,关联的绑定将从可恢复实体中删除。

最后,调用resetAllManualDeclarations()将阻止任何先前声明的实体的恢复。