AMQP 抽象

Spring AMQP 由两个模块组成(在发行版中每个模块都由一个 JAR 表示):spring-amqpspring-rabbit。'spring-amqp' 模块包含 org.springframework.amqp.core 包。在该包中,您可以找到表示 AMQP 核心“模型”的类。我们的目标是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。最终用户代码可以在不同供应商的实现之间更加便携,因为它可以仅针对抽象层进行开发。这些抽象随后由特定于代理的模块(例如 'spring-rabbit')实现。目前只有一种 RabbitMQ 实现。但是,除了 RabbitMQ 之外,这些抽象还在 .NET 中使用 Apache Qpid 进行了验证。由于 AMQP 在协议级别运行,原则上,您可以将 RabbitMQ 客户端与任何支持相同协议版本的代理一起使用,但我们目前没有测试任何其他代理。

本概述假设您已经熟悉 AMQP 规范的基础知识。如果不是,请查看 其他资源 中列出的资源。

Message

0-9-1 AMQP 规范没有定义 Message 类或接口。相反,在执行诸如 basicPublish() 之类的操作时,内容将作为字节数组参数传递,其他属性将作为单独的参数传递。Spring AMQP 将 Message 类定义为更通用的 AMQP 域模型表示的一部分。Message 类的目的是将主体和属性封装在一个实例中,以便 API 可以反过来更简单。以下示例显示了 Message 类定义

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
}

MessageProperties 接口定义了几个常见的属性,例如 'messageId'、'timestamp'、'contentType' 等等。您还可以通过调用 setHeader(String key, Object value) 方法使用用户定义的 'headers' 扩展这些属性。

从版本 1.5.71.6.111.7.42.0.0 开始,如果消息体是序列化后的 Serializable java 对象,则在执行 toString() 操作(例如在日志消息中)时,它不再被反序列化(默认情况下)。这是为了防止不安全的反序列化。默认情况下,只有 java.utiljava.lang 类会被反序列化。要恢复到之前的行为,您可以通过调用 Message.addAllowedListPatterns(…​) 添加允许的类/包模式。支持简单的 通配符,例如 com.something., *.MyClass。无法反序列化的消息体在日志消息中用 byte[<size>] 表示。

交换机

Exchange 接口表示 AMQP 交换机,消息生产者将消息发送到交换机。代理的每个虚拟主机中的每个交换机都有一个唯一的名称以及其他一些属性。以下示例显示了 Exchange 接口

public interface Exchange {

    String getName();

    String getExchangeType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

}

如您所见,Exchange 还有一个由 ExchangeTypes 中定义的常量表示的 '类型'。基本类型是:directtopicfanoutheaders。在核心包中,您可以找到针对每种类型实现的 Exchange 接口。这些 Exchange 类型在处理与队列的绑定方面,其行为各不相同。例如,Direct 交换机允许队列通过固定的路由键(通常是队列的名称)进行绑定。Topic 交换机支持使用路由模式的绑定,这些模式可能包含 '*' 和 '#' 通配符,分别表示 '正好一个' 和 '零个或多个'。Fanout 交换机将发布到与其绑定的所有队列,而不考虑任何路由键。有关这些和其他交换机类型的更多信息,请参见 其他资源

AMQP 规范还要求任何代理提供一个没有名称的“默认”直接交换。所有声明的队列都绑定到该默认的 Exchange,其名称作为路由键。您可以在 AmqpTemplate 中了解有关 Spring AMQP 中默认交换使用的更多信息。

队列

Queue 类表示消息消费者接收消息的组件。与各种 Exchange 类一样,我们的实现旨在成为这种核心 AMQP 类型的抽象表示。以下清单显示了 Queue

public class Queue  {

    private final String name;

    private volatile boolean durable;

    private volatile boolean exclusive;

    private volatile boolean autoDelete;

    private volatile Map<String, Object> arguments;

    /**
     * The queue is durable, non-exclusive and non auto-delete.
     *
     * @param name the name of the queue.
     */
    public Queue(String name) {
        this(name, true, false, false);
    }

    // Getters and Setters omitted for brevity

}

请注意,构造函数采用队列名称。根据实现的不同,管理模板可能提供用于生成唯一命名队列的方法。此类队列可用作“回复到”地址或在其他临时情况下使用。出于这个原因,自动生成的队列的“exclusive”和“autoDelete”属性都将设置为“true”。

有关使用命名空间支持(包括队列参数)声明队列的信息,请参阅 配置代理 中关于队列的部分。

绑定

鉴于生产者发送到交换机,而消费者从队列接收,连接队列到交换机的绑定对于通过消息连接这些生产者和消费者至关重要。在 Spring AMQP 中,我们定义了一个 Binding 类来表示这些连接。本节回顾了将队列绑定到交换机的基本选项。

您可以将队列绑定到 DirectExchange,并使用固定路由键,如下例所示

new Binding(someQueue, someDirectExchange, "foo.bar");

您可以将队列绑定到 TopicExchange,并使用路由模式,如下例所示

new Binding(someQueue, someTopicExchange, "foo.*");

您可以将队列绑定到 FanoutExchange,并使用无路由键,如下例所示

new Binding(someQueue, someFanoutExchange);

我们还提供了一个 BindingBuilder 来促进“流畅 API”风格,如下例所示

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
为了清楚起见,前面的示例显示了 BindingBuilder 类,但这种风格在使用“bind()”方法的静态导入时效果很好。

Binding 类实例本身只保存关于连接的数据。换句话说,它不是一个“活动”组件。但是,正如您将在后面的 配置代理 中看到的那样,AmqpAdmin 类可以使用 Binding 实例在代理上实际触发绑定操作。此外,正如您在同一部分中看到的那样,您可以使用 Spring 的 @Bean 注解在 @Configuration 类中定义 Binding 实例。还有一个方便的基类,它进一步简化了生成 AMQP 相关 bean 定义的方法,并识别队列、交换机和绑定,以便在应用程序启动时在 AMQP 代理上声明它们。

AmqpTemplate 也定义在核心包中。作为参与实际 AMQP 消息传递的主要组件之一,它将在其自己的部分中详细讨论(参见 AmqpTemplate)。