配置 Broker

AMQP规范描述了如何使用协议在Broker上配置队列、交换器和绑定。这些操作(从0.8规范及更高版本均可移植)存在于org.springframework.amqp.core包中的AmqpAdmin接口中。该类的RabbitMQ实现是位于org.springframework.amqp.rabbit.core包中的RabbitAdmin

AmqpAdmin接口基于使用Spring AMQP领域抽象,如下所示:

public interface AmqpAdmin {

    // Exchange Operations

    void declareExchange(Exchange exchange);

    void deleteExchange(String exchangeName);

    // Queue Operations

    Queue declareQueue();

    String declareQueue(Queue queue);

    boolean deleteQueue(String queueName);

    void deleteQueue(String queueName, boolean unused, boolean empty);

    void purgeQueue(String queueName, boolean noWait);

    // Binding Operations

    void declareBinding(Binding binding);

    void removeBinding(Binding binding);

    Properties getQueueProperties(String queueName);

    QueueInformation getQueueInfo(String queueName);

}

另请参阅作用域操作

getQueueProperties()方法返回有关队列的一些有限信息(消息计数和消费者计数)。返回属性的键在RabbitAdmin中以常量形式提供(QUEUE_NAMEQUEUE_MESSAGE_COUNTQUEUE_CONSUMER_COUNT)。getQueueInfo()返回一个方便的QueueInformation数据对象。

无参数的declareQueue()方法在Broker上定义一个自动生成名称的队列。此自动生成队列的附加属性是exclusive=trueautoDelete=truedurable=false

declareQueue(Queue queue)方法接受一个Queue对象并返回已声明队列的名称。如果提供的Queuename属性为空String,则Broker将以生成的名称声明队列。该名称将返回给调用者。该名称也添加到QueueactualName属性中。您只能通过直接调用RabbitAdmin以编程方式使用此功能。当在应用程序上下文中声明性地定义队列时,如果使用Admin的自动声明功能,您可以将名称属性设置为""(空字符串)。然后Broker将创建名称。从版本2.1开始,监听器容器可以使用这种类型的队列。有关更多信息,请参阅容器和Broker命名队列

这与AnonymousQueue形成对比,在AnonymousQueue中,框架生成一个唯一的(UUID)名称,并将durable设置为false,将exclusiveautoDelete设置为true。一个<rabbit:queue/>,如果其name属性为空(或缺失),则始终会创建一个AnonymousQueue

请参阅AnonymousQueue,以了解为什么AnonymousQueue优于Broker生成的队列名称,以及如何控制名称的格式。从版本2.1开始,匿名队列默认声明时带有参数Queue.X_QUEUE_LEADER_LOCATOR,并设置为client-local。这确保了队列在应用程序连接到的节点上声明。声明式队列必须具有固定名称,因为它们可能在上下文中的其他地方被引用——例如在以下示例中所示的监听器中:

<rabbit:listener-container>
    <rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>

此接口的RabbitMQ实现是RabbitAdmin,当使用Spring XML配置时,它类似于以下示例:

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>

CachingConnectionFactory缓存模式为CHANNEL(默认)时,RabbitAdmin实现会自动延迟声明在同一ApplicationContext中声明的队列、交换器和绑定。一旦与Broker建立Connection,这些组件就会被声明。有一些命名空间功能使得这非常方便——例如,在Stocks示例应用程序中,我们有以下内容:

<rabbit:queue id="tradeQueue"/>

<rabbit:queue id="marketDataQueue"/>

<fanout-exchange name="broadcast.responses"
                 xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="tradeQueue"/>
    </bindings>
</fanout-exchange>

<topic-exchange name="app.stock.marketdata"
                xmlns="http://www.springframework.org/schema/rabbit">
    <bindings>
        <binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
    </bindings>
</topic-exchange>

在前面的示例中,我们使用匿名队列(实际上,内部只是框架生成的带有名称的队列,而不是由Broker生成的)并通过ID引用它们。我们也可以声明具有显式名称的队列,这些名称也用作其在上下文中的bean定义的标识符。以下示例配置了一个具有显式名称的队列:

<rabbit:queue name="stocks.trade.queue"/>
您可以同时提供idname属性。这允许您通过与队列名称无关的ID来引用队列(例如,在绑定中)。它还允许使用标准Spring功能(例如属性占位符和SpEL表达式用于队列名称)。当您使用名称作为bean标识符时,这些功能不可用。

队列可以配置额外的参数——例如,x-message-ttl。当您使用命名空间支持时,它们以参数名称/参数值对的Map形式提供,这些由<rabbit:queue-arguments>元素定义。以下示例展示了如何实现:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

默认情况下,参数被假定为字符串。对于其他类型的参数,您必须提供类型。以下示例展示了如何指定类型:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments value-type="java.lang.Long">
        <entry key="x-message-ttl" value="100"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当提供混合类型的参数时,您必须为每个条目元素提供类型。以下示例展示了如何实现:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl">
            <value type="java.lang.Long">100</value>
        </entry>
        <entry key="x-dead-letter-exchange" value="myDLX"/>
        <entry key="x-dead-letter-routing-key" value="dlqRK"/>
    </rabbit:queue-arguments>
</rabbit:queue>

从Spring Framework 3.2及更高版本开始,这可以更简洁地声明,如下所示:

<rabbit:queue name="withArguments">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
        <entry key="x-ha-policy" value="all"/>
    </rabbit:queue-arguments>
</rabbit:queue>

当使用Java配置时,Queue.X_QUEUE_LEADER_LOCATOR参数通过Queue类上的setLeaderLocator()方法作为一流属性得到支持。从版本2.1开始,匿名队列默认声明时此属性设置为client-local。这确保了队列在应用程序连接到的节点上声明。

RabbitMQ Broker不允许声明具有不匹配参数的队列。例如,如果一个queue已经存在且没有time to live参数,而您尝试使用(例如)key="x-message-ttl" value="100"来声明它,则会抛出异常。

默认情况下,当发生任何异常时,RabbitAdmin会立即停止处理所有声明。这可能导致下游问题,例如监听器容器因另一个队列(在出错的队列之后定义)未声明而未能初始化。

可以通过将RabbitAdmin实例上的ignore-declaration-exceptions属性设置为true来修改此行为。此选项指示RabbitAdmin记录异常并继续声明其他元素。当使用Java配置RabbitAdmin时,此属性名为ignoreDeclarationExceptions。这是一个全局设置,适用于所有元素。队列、交换器和绑定具有一个类似的属性,仅适用于这些元素。

在版本1.6之前,此属性仅在通道上发生IOException时生效,例如当当前属性和所需属性不匹配时。现在,此属性对任何异常都生效,包括TimeoutException等。

此外,任何声明异常都会导致发布DeclarationExceptionEvent,这是一个ApplicationEvent,可以由上下文中的任何ApplicationListener消费。该事件包含对admin、正在声明的元素和Throwable的引用。

Header交换器

从版本1.3开始,您可以配置HeadersExchange以匹配多个header。您还可以指定是否必须匹配任何或所有header。以下示例展示了如何实现:

<rabbit:headers-exchange name="headers-test">
    <rabbit:bindings>
        <rabbit:binding queue="bucket">
            <rabbit:binding-arguments>
                <entry key="foo" value="bar"/>
                <entry key="baz" value="qux"/>
                <entry key="x-match" value="all"/>
            </rabbit:binding-arguments>
        </rabbit:binding>
    </rabbit:bindings>
</rabbit:headers-exchange>

从版本1.6开始,您可以为Exchanges配置一个internal标志(默认为false),并且这样的Exchange通过RabbitAdmin(如果应用程序上下文存在)在Broker上正确配置。如果交换器的internal标志为true,RabbitMQ将不允许客户端使用该交换器。这对于死信交换器或交换器到交换器的绑定很有用,您不希望发布者直接使用该交换器。

要了解如何使用Java配置AMQP基础设施,请查看Stock示例应用程序,其中有一个@ConfigurationAbstractStockRabbitConfiguration,该类又包含RabbitClientConfigurationRabbitServerConfiguration子类。以下列表显示了AbstractStockRabbitConfiguration的代码:

@Configuration
public abstract class AbstractStockAppRabbitConfiguration {

    @Bean
    public CachingConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
            new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(jsonMessageConverter());
        configureRabbitTemplate(template);
        return template;
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public TopicExchange marketDataExchange() {
        return new TopicExchange("app.stock.marketdata");
    }

    // additional code omitted for brevity

}

在Stock应用程序中,服务器使用以下@Configuration类配置:

@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration  {

    @Bean
    public Queue stockRequestQueue() {
        return new Queue("app.stock.request");
    }
}

这是@Configuration类整个继承链的末端。最终结果是TopicExchangeQueue在应用程序启动时被声明到Broker。服务器配置中没有TopicExchange到队列的绑定,因为这是在客户端应用程序中完成的。然而,股票请求队列会自动绑定到AMQP默认交换器。此行为由规范定义。

客户端的@Configuration类更有趣一些。它的声明如下:

@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {

    @Value("${stocks.quote.pattern}")
    private String marketDataRoutingKey;

    @Bean
    public Queue marketDataQueue() {
        return amqpAdmin().declareQueue();
    }

    /**
     * Binds to the market data exchange.
     * Interested in any stock quotes
     * that match its routing key.
     */
    @Bean
    public Binding marketDataBinding() {
        return BindingBuilder.bind(
                marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
    }

    // additional code omitted for brevity

}

客户端通过AmqpAdmin上的declareQueue()方法声明另一个队列。它将该队列绑定到市场数据交换器,路由模式在属性文件中外部化。

队列和交换器的构建器API

版本1.6引入了一个方便的流式API,用于在使用Java配置时配置QueueExchange对象。以下示例展示了如何使用它:

@Bean
public Queue queue() {
    return QueueBuilder.nonDurable("foo")
        .autoDelete()
        .exclusive()
        .withArgument("foo", "bar")
        .build();
}

@Bean
public Exchange exchange() {
  return ExchangeBuilder.directExchange("foo")
      .autoDelete()
      .internal()
      .withArgument("foo", "bar")
      .build();
}

从版本2.0开始,ExchangeBuilder现在默认创建持久交换器,以与各个AbstractExchange类上的简单构造函数保持一致。要使用构建器创建非持久交换器,请在调用.build()之前使用.durable(false)。不带参数的durable()方法不再提供。

版本2.2引入了流式API来添加“知名”交换器和队列参数…

@Bean
public Queue allArgs1() {
    return QueueBuilder.nonDurable("all.args.1")
            .ttl(1000)
            .expires(200_000)
            .maxLength(42)
            .maxLengthBytes(10_000)
            .overflow(Overflow.rejectPublish)
            .deadLetterExchange("dlx")
            .deadLetterRoutingKey("dlrk")
            .maxPriority(4)
            .lazy()
            .leaderLocator(LeaderLocator.minLeaders)
            .singleActiveConsumer()
            .build();
}

@Bean
public DirectExchange ex() {
    return ExchangeBuilder.directExchange("ex.with.alternate")
            .durable(true)
            .alternate("alternate")
            .build();
}

声明交换器、队列和绑定的集合

您可以将Declarable对象(QueueExchangeBinding)的集合包装在Declarables对象中。RabbitAdmin会在应用程序上下文中检测此类bean(以及离散的Declarable bean),并在建立连接(最初和连接失败后)时在Broker上声明包含的对象。以下示例展示了如何实现:

@Configuration
public static class Config {

    @Bean
    public CachingConnectionFactory cf() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public RabbitAdmin admin(ConnectionFactory cf) {
        return new RabbitAdmin(cf);
    }

    @Bean
    public DirectExchange e1() {
        return new DirectExchange("e1", false, true);
    }

    @Bean
    public Queue q1() {
        return new Queue("q1", false, false, true);
    }

    @Bean
    public Binding b1() {
        return BindingBuilder.bind(q1()).to(e1()).with("k1");
    }

    @Bean
    public Declarables es() {
        return new Declarables(
                new DirectExchange("e2", false, true),
                new DirectExchange("e3", false, true));
    }

    @Bean
    public Declarables qs() {
        return new Declarables(
                new Queue("q2", false, false, true),
                new Queue("q3", false, false, true));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public Declarables prototypes() {
        return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
    }

    @Bean
    public Declarables bs() {
        return new Declarables(
                new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
                new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
    }

    @Bean
    public Declarables ds() {
        return new Declarables(
                new DirectExchange("e4", false, true),
                new Queue("q4", false, false, true),
                new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
    }

}
在2.1版本之前,您可以通过定义Collection<Declarable>类型的bean来声明多个Declarable实例。在某些情况下,这可能导致不良副作用,因为admin必须遍历所有Collection<?> bean。

版本2.2为Declarables添加了getDeclarablesByType方法;这可以用作便利,例如在声明监听器容器bean时。

public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
        Declarables mixedDeclarables, MessageListener listener) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
    container.setMessageListener(listener);
    return container;
}

条件声明

默认情况下,所有队列、交换器和绑定都由应用程序上下文中的所有RabbitAdmin实例声明(假设它们具有auto-startup="true")。

从版本2.1.9开始,RabbitAdmin具有一个新属性explicitDeclarationsOnly(默认值为false);当设置为true时,admin将仅声明明确配置由该admin声明的bean。

从1.2版本开始,您可以有条件地声明这些元素。当应用程序连接到多个Broker并且需要指定特定元素应该在哪个Broker上声明时,这特别有用。

代表这些元素的类实现了Declarable,它有两个方法:shouldDeclare()getDeclaringAdmins()RabbitAdmin使用这些方法来确定特定实例是否应该实际处理其Connection上的声明。

这些属性在命名空间中以属性形式提供,如以下示例所示:

<rabbit:admin id="admin1" connection-factory="CF1" />

<rabbit:admin id="admin2" connection-factory="CF2" />

<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />

<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />

<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />

<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />

<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />

<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
    <rabbit:bindings>
        <rabbit:binding key="foo" queue="bar"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare属性为true,如果未提供declared-by(或为空),则所有RabbitAdmin实例都声明该对象(只要admin的auto-startup属性为true(默认值),并且admin的explicit-declarations-only属性为false)。

同样,您可以使用基于Java的@Configuration来实现相同的效果。在以下示例中,组件由admin1声明,但不由admin2声明:

@Bean
public RabbitAdmin admin1() {
    return new RabbitAdmin(cf1());
}

@Bean
public RabbitAdmin admin2() {
    return new RabbitAdmin(cf2());
}

@Bean
public Queue queue() {
    Queue queue = new Queue("foo");
    queue.setAdminsThatShouldDeclare(admin1());
    return queue;
}

@Bean
public Exchange exchange() {
    DirectExchange exchange = new DirectExchange("bar");
    exchange.setAdminsThatShouldDeclare(admin1());
    return exchange;
}

@Bean
public Binding binding() {
    Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
    binding.setAdminsThatShouldDeclare(admin1());
    return binding;
}

关于idname属性的注意事项

<rabbit:queue/><rabbit:exchange/>元素上的name属性反映了实体在Broker中的名称。对于队列,如果省略name,则会创建一个匿名队列(请参阅AnonymousQueue)。

在2.0版本之前,name也被注册为bean名称别名(类似于<bean/>元素上的name)。

这导致了两个问题:

  • 它阻止了声明具有相同名称的队列和交换器。

  • 如果别名包含SpEL表达式(#{…​}),则无法解析。

从版本2.0开始,如果您使用idname属性声明其中一个元素,则该名称不再声明为bean名称别名。如果您希望声明具有相同name的队列和交换器,则必须提供一个id

如果元素只有一个name属性,则没有变化。bean仍然可以通过name引用——例如,在绑定声明中。但是,如果名称包含SpEL,您仍然无法引用它——您必须提供一个id以供引用。

AnonymousQueue

通常,当您需要一个唯一命名的、排他的、自动删除的队列时,我们建议您使用AnonymousQueue而不是Broker定义的队列名称(使用""作为Queue名称会导致Broker生成队列名称)。

这是因为:

  1. 队列实际上是在与Broker建立连接时声明的。这比bean创建和连接在一起晚得多。使用队列的bean需要知道它的名称。实际上,在应用程序启动时,Broker甚至可能没有运行。

  2. 如果与Broker的连接由于某种原因丢失,admin将使用相同的名称重新声明AnonymousQueue。如果使用Broker声明的队列,队列名称将会改变。

您可以控制AnonymousQueue实例使用的队列名称的格式。

默认情况下,队列名称以spring.gen-为前缀,后跟UUID的base64表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g

您可以在构造函数参数中提供AnonymousQueue.NamingStrategy实现。以下示例展示了如何实现:

@Bean
public Queue anon1() {
    return new AnonymousQueue();
}

@Bean
public Queue anon2() {
    return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}

@Bean
public Queue anon3() {
    return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}

第一个bean生成以spring.gen-为前缀的队列名称,后跟UUID的base64表示——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g。第二个bean生成以something-为前缀的队列名称,后跟UUID的base64表示。第三个bean仅使用UUID(无base64转换)生成名称——例如,f20c818a-006b-4416-bf91-643590fedb0e

base64编码使用RFC 4648中的“URL和文件名安全字母表”。尾随的填充字符(=)将被删除。

您可以提供自己的命名策略,从而在队列名称中包含其他信息(例如应用程序名称或客户端主机)。

您可以在使用XML配置时指定命名策略。naming-strategy属性存在于<rabbit:queue>元素上,用于实现AnonymousQueue.NamingStrategy的bean引用。以下示例展示了如何以各种方式指定命名策略:

<rabbit:queue id="uuidAnon" />

<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />

<rabbit:queue id="customAnon" naming-strategy="customNamer" />

<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />

<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
    <constructor-arg value="custom.gen-" />
</bean>

第一个示例创建诸如spring.gen-MRBv9sqISkuCiPfOYfpo4g的名称。第二个示例创建带有UUID的字符串表示的名称。第三个示例创建诸如custom.gen-MRBv9sqISkuCiPfOYfpo4g的名称。

您也可以提供自己的命名策略bean。

从版本2.1开始,匿名队列默认声明时带有参数Queue.X_QUEUE_LEADER_LOCATOR,并设置为client-local。这确保了队列在应用程序连接到的节点上声明。您可以通过在构造实例后调用queue.setLeaderLocator(null)来恢复到之前的行为。

恢复自动删除声明

通常,RabbitAdmin(es)只恢复在应用程序上下文中声明为bean的队列/交换器/绑定;如果任何此类声明是自动删除的,则如果连接丢失,它们将被Broker删除。当重新建立连接时,admin将重新声明实体。通常,通过调用admin.declareQueue(…​)admin.declareExchange(…​)admin.declareBinding(…​)创建的实体将不会被恢复。

从版本2.4开始,admin有一个新属性redeclareManualDeclarations;当设置为true时,admin除了恢复应用程序上下文中的bean之外,还会恢复这些实体。

如果调用了deleteQueue(…​)deleteExchange(…​)removeBinding(…​),则不会执行单个声明的恢复。当队列和交换器被删除时,相关的绑定将从可恢复实体中移除。

最后,调用resetAllManualDeclarations()将阻止恢复任何先前声明的实体。

© . This site is unofficial and not affiliated with VMware.