使用 Spring JMS

本节介绍如何使用 Spring 的 JMS 组件。

使用JmsTemplate

JmsTemplate 类是 JMS 核心包中的核心类。它简化了 JMS 的使用,因为它在发送或同步接收消息时处理资源的创建和释放。

使用JmsTemplate的代码只需要实现回调接口,这些接口为它们提供了明确定义的高级契约。MessageCreator回调接口在给定JmsTemplate中调用代码提供的Session时创建消息。为了允许更复杂地使用 JMS API,SessionCallback提供 JMS 会话,而ProducerCallback公开SessionMessageProducer对。

JMS API 公开了两种类型的发送方法,一种方法将传递模式、优先级和生存时间作为服务质量 (QOS) 参数,另一种方法不带 QOS 参数并使用默认值。由于JmsTemplate有很多发送方法,因此设置 QOS 参数已作为 Bean 属性公开,以避免发送方法数量的重复。同样,同步接收调用的超时值是通过使用setReceiveTimeout属性设置的。

某些 JMS 提供程序允许通过配置ConnectionFactory以管理方式设置默认 QOS 值。这会导致对MessageProducer实例的send方法(send(Destination destination, Message message))的调用使用与 JMS 规范中指定的不同的 QOS 默认值。为了提供一致的 QOS 值管理,JmsTemplate必须通过将布尔属性isExplicitQosEnabled设置为true来专门启用使用其自身的 QOS 值。

为方便起见,JmsTemplate还公开了一个基本的请求-回复操作,该操作允许发送消息并等待在作为操作一部分创建的临时队列上回复。

一旦配置,JmsTemplate类的实例是线程安全的。这很重要,因为它意味着您可以配置单个JmsTemplate实例,然后安全地将此共享引用注入到多个协作者中。明确地说,JmsTemplate是有状态的,因为它维护对ConnectionFactory的引用,但此状态不是会话状态。

从 Spring Framework 4.1 开始,JmsMessagingTemplate构建在JmsTemplate之上,并提供与消息传递抽象(即org.springframework.messaging.Message)的集成。这使您可以以通用方式创建要发送的消息。

连接

JmsTemplate需要引用ConnectionFactoryConnectionFactory是 JMS 规范的一部分,是使用 JMS 的入口点。客户端应用程序将其用作工厂来创建与 JMS 提供程序的连接,并封装各种配置参数,其中许多参数是特定于供应商的,例如 SSL 配置选项。

在 EJB 中使用 JMS 时,供应商提供 JMS 接口的实现,以便它们可以参与声明式事务管理并执行连接和会话的池化。为了使用此实现,Jakarta EE 容器通常要求您在 EJB 或 servlet 部署描述符中将 JMS 连接工厂声明为resource-ref。为了确保在 EJB 中使用JmsTemplate时使用这些功能,客户端应用程序应确保它引用ConnectionFactory的托管实现。

缓存消息资源

标准 API 涉及创建许多中间对象。要发送消息,将执行以下“API”遍历

ConnectionFactory->Connection->Session->MessageProducer->send

ConnectionFactorySend操作之间,将创建和销毁三个中间对象。为了优化资源使用并提高性能,Spring 提供了两种ConnectionFactory的实现。

使用SingleConnectionFactory

Spring 提供了ConnectionFactory接口的实现SingleConnectionFactory,它在所有createConnection()调用上返回相同的Connection,并忽略对close()的调用。这对于测试和独立环境非常有用,以便可以在跨越任意数量事务的多个JmsTemplate调用中使用相同的连接。SingleConnectionFactory引用标准ConnectionFactory,该工厂通常来自 JNDI。

使用CachingConnectionFactory

CachingConnectionFactory 扩展了 SingleConnectionFactory 的功能,并增加了对 SessionMessageProducerMessageConsumer 实例的缓存。初始缓存大小设置为 1。您可以使用 sessionCacheSize 属性来增加缓存的会话数量。请注意,实际缓存的会话数量会超过此数值,因为会话是基于其确认模式进行缓存的,因此当 sessionCacheSize 设置为 1 时,最多可以有四个缓存的会话实例(每个确认模式一个)。MessageProducerMessageConsumer 实例缓存在其所属的会话中,并在缓存时也考虑了生产者和消费者的唯一属性。MessageProducer 基于其目标进行缓存。MessageConsumer 基于一个键进行缓存,该键由目标、选择器、noLocal 传递标志和持久订阅名称(如果创建持久消费者)组成。

临时队列和主题 (TemporaryQueue/TemporaryTopic) 的 MessageProducer 和 MessageConsumer 将永远不会被缓存。不幸的是,WebLogic JMS 恰好在其常规目标实现上实现了临时队列/主题接口,错误地表明其没有任何目标可以缓存。请在 WebLogic 上使用不同的连接池/缓存,或为 WebLogic 目的自定义 CachingConnectionFactory

目标管理

目标,作为 ConnectionFactory 实例,是您可以存储和检索在 JNDI 中的 JMS 管理对象。在配置 Spring 应用程序上下文时,您可以使用 JNDI JndiObjectFactoryBean 工厂类或 <jee:jndi-lookup> 对对象对 JMS 目标的引用执行依赖注入。但是,如果应用程序中存在大量目标,或者存在 JMS 提供程序特有的高级目标管理功能,则此策略通常很繁琐。此类高级目标管理的示例包括创建动态目标或支持目标的分层命名空间。JmsTemplate 将目标名称的解析委托给实现 DestinationResolver 接口的 JMS 目标对象。DynamicDestinationResolverJmsTemplate 使用的默认实现,它可以解析动态目标。还提供了一个 JndiDestinationResolver 作为 JNDI 中包含的目标的服务定位器,并可以选择回退到 DynamicDestinationResolver 中包含的行为。

通常,JMS 应用程序中使用的目标仅在运行时才知道,因此无法在应用程序部署时进行管理创建。这是因为交互式系统组件之间存在共享的应用程序逻辑,这些组件根据众所周知的命名约定在运行时创建目标。即使动态目标的创建不是 JMS 规范的一部分,大多数供应商也提供了此功能。动态目标使用用户定义的名称创建,这使它们与临时目标区分开来,并且通常未在 JNDI 中注册。用于创建动态目标的 API 因提供商而异,因为与目标关联的属性是特定于供应商的。但是,供应商有时会做出一个简单的实现选择,即忽略 JMS 规范中的警告,并使用 TopicSessioncreateTopic(String topicName) 方法或 QueueSessioncreateQueue(String queueName) 方法来创建一个具有默认目标属性的新目标。根据供应商的实现,DynamicDestinationResolver 还可以创建物理目标,而不仅仅是解析目标。

布尔属性 pubSubDomain 用于配置 JmsTemplate,使其了解正在使用的 JMS 域。默认情况下,此属性的值为 false,表示要使用点对点域 Queues。此属性(由 JmsTemplate 使用)决定通过 DestinationResolver 接口的实现进行动态目标解析的行为。

您还可以通过 defaultDestination 属性为 JmsTemplate 配置默认目标。默认目标用于不引用特定目标的发送和接收操作。

消息监听器容器

在 EJB 世界中,JMS 消息最常见的用途之一是驱动消息驱动 Bean (MDB)。Spring 提供了一种解决方案,可以创建消息驱动 POJO (MDP),而无需将用户绑定到 EJB 容器。(有关 Spring 的 MDP 支持的详细介绍,请参见 异步接收:消息驱动 POJO)。从 Spring Framework 4.1 开始,可以使用 @JmsListener 注解端点方法——有关详细信息,请参见 注解驱动的监听器端点

消息监听器容器用于从 JMS 消息队列接收消息并驱动注入到其中的 MessageListener。监听器容器负责消息接收的所有线程以及分派到监听器进行处理。消息监听器容器是 MDP 和消息提供程序之间的中介,负责注册接收消息、参与事务、资源获取和释放、异常转换等等。这使您可以编写与接收消息(并可能对其进行响应)相关的(可能很复杂)业务逻辑,并将样板 JMS 基础设施问题委托给框架。

Spring 打包了两个标准的 JMS 消息监听器容器,每个容器都具有其专门的功能集。

使用 SimpleMessageListenerContainer

此消息监听器容器是两种标准类型中比较简单的一种。它在启动时创建固定数量的 JMS 会话和消费者,使用标准 JMS MessageConsumer.setMessageListener() 方法注册监听器,并将其留给 JMS 提供程序执行监听器回调。此变体不允许动态适应运行时需求或参与外部管理的事务。在兼容性方面,它非常接近于独立 JMS 规范的精神,但通常与 Jakarta EE 的 JMS 限制不兼容。

虽然 SimpleMessageListenerContainer 不允许参与外部管理的事务,但它支持本地 JMS 事务。要启用此功能,您可以将 sessionTransacted 标志切换为 true,或者在 XML 命名空间中将 acknowledge 属性设置为 transacted。从您的侦听器抛出的异常会导致回滚,并且消息将被重新传递。或者,考虑使用 CLIENT_ACKNOWLEDGE 模式,该模式在发生异常时也提供重新传递,但不使用事务性 Session 实例,因此不包括任何其他 Session 操作(例如发送响应消息)在事务协议中。
默认的 AUTO_ACKNOWLEDGE 模式不提供适当的可靠性保证。当侦听器执行失败(因为提供程序在侦听器调用后自动确认每条消息,没有要传播到提供程序的异常)或当侦听器容器关闭时(您可以通过设置 acceptMessagesWhileStopping 标志来配置此设置),消息可能会丢失。如果需要可靠性(例如,对于可靠的队列处理和持久主题订阅),请务必使用事务性会话。

使用 DefaultMessageListenerContainer

此消息监听器容器在大多数情况下使用。与 SimpleMessageListenerContainer 相比,此容器变体允许动态适应运行时需求,并且能够参与外部管理的事务。当使用 JtaTransactionManager 配置时,每条接收到的消息都将与 XA 事务一起注册。因此,处理可以利用 XA 事务语义。此监听器容器在对 JMS 提供程序的低要求、高级功能(例如参与外部管理的事务)和与 Jakarta EE 环境的兼容性之间取得了良好的平衡。

您可以自定义容器的缓存级别。请注意,当没有启用缓存时,将为每个消息接收创建一个新的连接和一个新的会话。将此与高负载的非持久订阅相结合可能会导致消息丢失。在这种情况下,请务必使用适当的缓存级别。

此容器在代理关闭时也具有可恢复功能。默认情况下,简单的 BackOff 实现每五秒钟重试一次。您可以指定自定义 BackOff 实现以获得更细粒度的恢复选项。有关示例,请参见 ExponentialBackOff

与其同类(SimpleMessageListenerContainer)一样,DefaultMessageListenerContainer 支持本地 JMS 事务并允许自定义确认模式。如果您的场景允许,强烈建议使用此方法而不是外部管理的事务——也就是说,如果您可以在 JVM 死亡的情况下忍受偶尔的重复消息。您的业务逻辑中的自定义重复消息检测步骤可以涵盖这种情况——例如,以业务实体存在检查或协议表检查的形式。任何此类安排都比替代方案效率高得多:使用 XA 事务(通过使用 JtaTransactionManager 配置您的 DefaultMessageListenerContainer)包装整个处理过程,以涵盖 JMS 消息的接收以及消息侦听器中业务逻辑的执行(包括数据库操作等)。
默认的 AUTO_ACKNOWLEDGE 模式不提供适当的可靠性保证。当侦听器执行失败(因为提供程序在侦听器调用后自动确认每条消息,没有要传播到提供程序的异常)或当侦听器容器关闭时(您可以通过设置 acceptMessagesWhileStopping 标志来配置此设置),消息可能会丢失。如果需要可靠性(例如,对于可靠的队列处理和持久主题订阅),请务必使用事务性会话。

事务管理

Spring 提供了一个 JmsTransactionManager,用于管理单个 JMS ConnectionFactory 的事务。这允许 JMS 应用程序利用 Spring 的托管事务功能,如 数据访问章节的事务管理部分 中所述。JmsTransactionManager 执行本地资源事务,将指定的 ConnectionFactory 中的 JMS Connection/Session 对绑定到线程。JmsTemplate 自动检测此类事务性资源并相应地对其进行操作。

在 Jakarta EE 环境中,ConnectionFactory 池化 Connection 和 Session 实例,因此这些资源可以在事务之间有效重用。在独立环境中,使用 Spring 的 SingleConnectionFactory 将导致共享 JMS Connection,每个事务都有其自己的独立 Session。或者,可以考虑使用特定于提供程序的池适配器,例如 ActiveMQ 的 PooledConnectionFactory 类。

您还可以将 JmsTemplateJtaTransactionManager 和支持 XA 的 JMS ConnectionFactory 一起使用来执行分布式事务。请注意,这需要使用 JTA 事务管理器以及正确配置 XA 的 ConnectionFactory。(请查看您的 Jakarta EE 服务器或 JMS 提供程序的文档。)

在使用JMS API从Connection创建Session时,跨托管和非托管事务环境重用代码可能会令人困惑。这是因为JMS API只有一个创建Session的工厂方法,并且它需要事务和确认模式的值。在托管环境中,设置这些值是环境事务基础设施的责任,因此这些值会被供应商对JMS Connection的包装器忽略。当在非托管环境中使用JmsTemplate时,您可以通过使用属性sessionTransactedsessionAcknowledgeMode来指定这些值。当您将PlatformTransactionManagerJmsTemplate一起使用时,模板始终会获得一个事务性JMS Session