示例应用程序
Spring AMQP 示例 项目包含两个示例应用程序。第一个是一个简单的“Hello World”示例,演示了同步和异步消息接收。它为理解基本组件提供了极好的起点。第二个示例基于股票交易用例,以演示现实世界应用程序中常见的交互类型。在本章中,我们将快速浏览每个示例,以便您可以专注于最重要的组件。这两个示例都是基于 Maven 的,因此您应该能够将它们直接导入到任何支持 Maven 的 IDE(例如 SpringSource Tool Suite)中。
“Hello World”示例
“Hello World”示例演示了同步和异步消息接收。您可以将 `spring-rabbit-helloworld` 示例导入到 IDE 中,然后按照下面的讨论进行操作。
同步示例
在 `src/main/java` 目录中,导航到 `org.springframework.amqp.helloworld` 包。打开 `HelloWorldConfiguration` 类,注意它在类级别包含 `@Configuration` 注解,并在方法级别包含一些 `@Bean` 注解。这是 Spring 基于 Java 的配置的一个示例。您可以此处了解更多信息。
下面的列表显示了连接工厂是如何创建的
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
该配置还包含一个 `RabbitAdmin` 实例,它默认情况下查找类型为交换机、队列或绑定的任何 bean,然后在代理上声明它们。实际上,在 `HelloWorldConfiguration` 中生成的 `helloWorldQueue` bean就是一个例子,因为它是一个 `Queue` 实例。
下面的列表显示了 `helloWorldQueue` bean 的定义
@Bean
public Queue helloWorldQueue() {
return new Queue(this.helloWorldQueueName);
}
回顾 `rabbitTemplate` bean 配置,您可以看到它的 `queue` 属性(用于接收消息)和 `routingKey` 属性(用于发送消息)都设置为 `helloWorldQueue` 的名称。
现在我们已经探索了配置,我们可以看看实际使用这些组件的代码。首先,从同一个包中打开 `Producer` 类。它包含一个 `main()` 方法,其中创建了 Spring `ApplicationContext`。
下面的列表显示了 `main` 方法
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("Hello World");
System.out.println("Sent: Hello World");
}
在前面的示例中,检索 `AmqpTemplate` bean 并将其用于发送 `Message`。由于客户端代码应尽可能依赖接口,因此类型为 `AmqpTemplate` 而不是 `RabbitTemplate`。即使在 `HelloWorldConfiguration` 中创建的 bean 是 `RabbitTemplate` 的实例,依赖接口也意味着此代码更易于移植(您可以独立于代码更改配置)。由于调用了 `convertAndSend()` 方法,因此模板委托给其 `MessageConverter` 实例。在这种情况下,它使用默认的 `SimpleMessageConverter`,但是可以向 `HelloWorldConfiguration` 中定义的 `rabbitTemplate` bean 提供不同的实现。
现在打开 `Consumer` 类。它实际上共享相同的配置基类,这意味着它共享 `rabbitTemplate` bean。这就是为什么我们将该模板配置为同时具有 `routingKey`(用于发送)和 `queue`(用于接收)的原因。`AmqpTemplate` 中的描述,您可以改为将 'routingKey' 参数传递给 send 方法,将 'queue' 参数传递给 receive 方法。`Consumer` 代码基本上是 Producer 的镜像,调用 `receiveAndConvert()` 而不是 `convertAndSend()`。
下面的列表显示了 `Consumer` 的 main 方法
public static void main(String[] args) {
ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate amqpTemplate = context.getBean(AmqpTemplate.class);
System.out.println("Received: " + amqpTemplate.receiveAndConvert());
}
如果您运行 `Producer`,然后运行 `Consumer`,您应该在控制台输出中看到 `Received: Hello World`。
异步示例
同步示例 演示了同步 Hello World 示例。本节介绍了一个稍微高级但功能更强大的选项。通过一些修改,Hello World 示例可以提供异步接收的示例,也称为消息驱动的 POJO。事实上,有一个子包提供了 exactly that: `org.springframework.amqp.samples.helloworld.async`。
同样,我们从发送端开始。打开 `ProducerConfiguration` 类,注意它创建了一个 `connectionFactory` 和一个 `rabbitTemplate` bean。这次,由于配置专门用于消息发送端,我们甚至不需要任何队列定义,并且 `RabbitTemplate` 只有 'routingKey' 属性设置。回想一下,消息是发送到交换机而不是直接发送到队列。AMQP 默认交换机是没有名称的直接交换机。所有队列都使用它们的名称作为路由键绑定到该默认交换机。这就是为什么我们只需要在这里提供路由键。
下面的列表显示了 `rabbitTemplate` 的定义
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.helloWorldQueueName);
return template;
}
由于本示例演示了异步消息接收,因此生产端被设计为连续发送消息(如果它像同步版本一样是每执行一次消息模型,那么它实际上是一个消息驱动型消费者这一点就不会那么明显)。负责连续发送消息的组件定义为ProducerConfiguration
中的内部类。它配置为每三秒钟运行一次。
下面的清单显示了该组件
static class ScheduledProducer {
@Autowired
private volatile RabbitTemplate rabbitTemplate;
private final AtomicInteger counter = new AtomicInteger();
@Scheduled(fixedRate = 3000)
public void sendMessage() {
rabbitTemplate.convertAndSend("Hello World " + counter.incrementAndGet());
}
}
您无需理解所有细节,因为真正的重点应该放在接收端(我们将在后面介绍)。但是,如果您还不熟悉 Spring 任务调度支持,可以此处了解更多信息。简而言之,ProducerConfiguration
中的postProcessor
bean 将任务注册到调度程序。
现在我们可以转向接收端。为了强调消息驱动的 POJO 行为,我们从对消息做出反应的组件开始。该类名为HelloWorldHandler
,如下面的清单所示
public class HelloWorldHandler {
public void handleMessage(String text) {
System.out.println("Received: " + text);
}
}
该类是一个 POJO。它不扩展任何基类,不实现任何接口,甚至不包含任何导入。它通过 Spring AMQP MessageListenerAdapter
被“适配”到MessageListener
接口。然后,您可以配置适配器在SimpleMessageListenerContainer
上。对于本示例,容器在ConsumerConfiguration
类中创建。您可以在那里看到包装在适配器中的 POJO。
下面的清单显示了listenerContainer
的定义方式
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueName(this.helloWorldQueueName);
container.setMessageListener(new MessageListenerAdapter(new HelloWorldHandler()));
return container;
}
SimpleMessageListenerContainer
是一个 Spring 生命周期组件,默认情况下会自动启动。如果您查看Consumer
类,您会发现它的main()
方法只包含一行引导程序来创建ApplicationContext
。Producer 的main()
方法也是一个单行引导程序,因为其方法使用@Scheduled
注解的组件也会自动启动。您可以按任意顺序启动Producer
和Consumer
,您应该会看到每三秒钟发送和接收消息。
股票交易
股票交易示例演示了比Hello World 示例更高级的消息传递场景。但是,配置非常相似,只是稍微复杂一些。由于我们已经详细介绍了 Hello World 配置,因此这里我们将重点介绍使本示例与众不同的内容。有一个服务器将市场数据(股票报价)推送到主题交换机。然后,客户端可以通过使用路由模式(例如,app.stock.quotes.nasdaq.*
)绑定队列来订阅市场数据馈送。此演示的另一个主要功能是客户端启动的请求-回复“股票交易”交互,并由服务器处理。这涉及一个私有的replyTo
队列,该队列由客户端在其订单请求消息中发送。
服务器的核心配置位于org.springframework.amqp.rabbit.stocks.config.server
包中的RabbitServerConfiguration
类中。它扩展了AbstractStockAppRabbitConfiguration
。这就是定义服务器和客户端共有的资源的地方,包括市场数据主题交换机(其名称为“app.stock.marketdata”)和服务器公开用于股票交易的队列(其名称为“app.stock.request”)。在该公共配置文件中,您还会看到RabbitTemplate
上配置了一个Jackson2JsonMessageConverter
。
服务器特定的配置包括两部分。首先,它在RabbitTemplate
上配置市场数据交换机,以便它不需要在每次调用以发送Message
时都提供该交换机名称。它在基配置类中定义的抽象回调方法中执行此操作。下面的清单显示了该方法
public void configureRabbitTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setExchange(MARKET_DATA_EXCHANGE_NAME);
}
其次,声明股票请求队列。在这种情况下,它不需要任何显式绑定,因为它使用自己的名称作为路由密钥绑定到默认的无名交换机。如前所述,AMQP 规范定义了这种行为。下面的清单显示了stockRequestQueue
bean 的定义
@Bean
public Queue stockRequestQueue() {
return new Queue(STOCK_REQUEST_QUEUE_NAME);
}
现在您已经了解了服务器的 AMQP 资源配置,请导航到src/test/java
目录下的org.springframework.amqp.rabbit.stocks
包。在那里,您可以看到提供main()
方法的实际Server
类。它基于server-bootstrap.xml
配置文件创建ApplicationContext
。在那里,您可以看到发布虚拟市场数据的计划任务。该配置依赖于 Spring 的task
命名空间支持。引导程序配置文件还导入了一些其他文件。最有趣的是server-messaging.xml
,它直接位于src/main/resources
下。在那里,您可以看到负责处理股票交易请求的messageListenerContainer
bean。最后,请查看在server-handlers.xml
(也位于“src/main/resources”中)中定义的serverHandler
bean。该 bean 是ServerHandler
类的实例,是消息驱动的 POJO 的一个很好的示例,它也可以发送回复消息。请注意,它本身并没有与框架或任何 AMQP 概念耦合。它接受一个TradeRequest
并返回一个TradeResponse
。下面的清单显示了handleMessage
方法的定义
public TradeResponse handleMessage(TradeRequest tradeRequest) { ...
}
现在我们已经了解了服务器最重要的配置和代码,我们可以转向客户端。最好的起点可能是org.springframework.amqp.rabbit.stocks.config.client
包中的RabbitClientConfiguration
。请注意,它声明了两个队列,而没有提供显式名称。下面的清单显示了这两个队列的 bean 定义
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
@Bean
public Queue traderJoeQueue() {
return amqpAdmin().declareQueue();
}
这些是私有队列,并且会自动生成唯一的名称。生成的第一个队列用于客户端绑定到服务器公开的市场数据交换机。回想一下,在 AMQP 中,消费者与队列交互,而生产者与交换机交互。队列与交换机的“绑定”会告诉代理从给定的交换机向队列传递(或路由)消息。由于市场数据交换机是主题交换机,因此可以使用路由模式来表达绑定。RabbitClientConfiguration
使用Binding
对象执行此操作,并且该对象使用BindingBuilder
流畅 API 生成。下面的清单显示了Binding
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
请注意,实际值已在属性文件(src/main/resources
下的client.properties
)中外部化,并且我们使用 Spring 的@Value
注解来注入该值。这通常是一个好主意。否则,该值将硬编码在类中,并且无法在不重新编译的情况下进行修改。在这种情况下,在更改用于绑定的路由模式时运行多个版本的客户端要容易得多。我们现在可以尝试一下。
首先运行org.springframework.amqp.rabbit.stocks.Server
,然后运行org.springframework.amqp.rabbit.stocks.Client
。您应该会看到NASDAQ
股票的虚拟报价,因为client.properties
中与“stocks.quote.pattern”键关联的当前值为“app.stock.quotes.nasdaq.”。现在,在保持现有的Server
和Client
运行的同时,将该属性值更改为“app.stock.quotes.nyse.”,并启动第二个Client
实例。您应该会看到第一个客户端仍然接收 NASDAQ 报价,而第二个客户端接收 NYSE 报价。您可以更改模式以获取所有股票,甚至单个股票代码。
我们探讨的最后一个功能是从客户端的角度来看请求-回复交互。回想一下,我们已经看到了接受TradeRequest
对象并返回TradeResponse
对象的ServerHandler
。客户端上的相应代码是org.springframework.amqp.rabbit.stocks.gateway
包中的RabbitStockServiceGateway
。它委托给RabbitTemplate
以发送消息。下面的清单显示了send
方法
public void send(TradeRequest tradeRequest) {
getRabbitTemplate().convertAndSend(tradeRequest, new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(new Address(defaultReplyToQueue));
try {
message.getMessageProperties().setCorrelationId(
UUID.randomUUID().toString().getBytes("UTF-8"));
}
catch (UnsupportedEncodingException e) {
throw new AmqpException(e);
}
return message;
}
});
}
请注意,在发送消息之前,它会设置replyTo
地址。它提供了由traderJoeQueue
bean 定义(如前所述)生成的队列。下面的清单显示了StockServiceGateway
类本身的@Bean
定义
@Bean
public StockServiceGateway stockServiceGateway() {
RabbitStockServiceGateway gateway = new RabbitStockServiceGateway();
gateway.setRabbitTemplate(rabbitTemplate());
gateway.setDefaultReplyToQueue(traderJoeQueue());
return gateway;
}
如果您不再运行服务器和客户端,请立即启动它们。尝试使用“100 TCKR”的格式发送请求。在模拟请求“处理”的短暂人工延迟之后,您应该会看到客户端上出现确认消息。