接收消息
本文档介绍了如何在 Spring 中使用 JMS 接收消息。
同步接收
虽然 JMS 通常与异步处理相关联,但您可以同步地消费消息。重载的 receive(..)
方法提供了此功能。在同步接收期间,调用线程会阻塞,直到有消息可用。这可能是一项危险的操作,因为调用线程可能会无限期地阻塞。receiveTimeout
属性指定接收器在放弃等待消息之前应等待多长时间。
异步接收:消息驱动的 POJO
Spring 还支持通过使用 @JmsListener 注解来使用带注释的监听器端点,并提供了一个开放的基础设施来以编程方式注册端点。到目前为止,这是设置异步接收器的最便捷方式。有关更多详细信息,请参阅 启用监听器端点注解。
|
与 EJB 世界中的消息驱动 Bean (MDB) 类似,消息驱动 POJO (MDP) 充当 JMS 消息的接收器。MDP 的唯一限制(但请参阅 使用 MessageListenerAdapter
)是它必须实现 jakarta.jms.MessageListener
接口。请注意,如果您的 POJO 在多个线程上接收消息,则必须确保您的实现是线程安全的。
以下示例展示了 MDP 的简单实现
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
实现 MessageListener
后,就可以创建消息监听器容器了。
以下示例展示了如何定义和配置与 Spring 捆绑在一起的消息监听器容器之一(在本例中为 DefaultMessageListenerContainer
)
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
有关每个实现支持的功能的完整说明,请参阅 Spring javadoc 中的各种消息监听器容器(所有这些容器都实现了 MessageListenerContainer)。
使用 SessionAwareMessageListener
接口
SessionAwareMessageListener
接口是 Spring 特定的接口,它提供与 JMS MessageListener
接口类似的契约,但也使消息处理方法能够访问接收 Message
的 JMS Session
。以下清单展示了 SessionAwareMessageListener
接口的定义
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果您希望 MDP 能够响应任何接收到的消息(通过使用 onMessage(Message, Session)
方法中提供的 Session
),可以选择让 MDP 实现此接口(而不是标准 JMS MessageListener
接口)。与 Spring 捆绑在一起的所有消息监听器容器实现都支持实现 MessageListener
或 SessionAwareMessageListener
接口的 MDP。实现 SessionAwareMessageListener
的类有一个缺点,那就是它们随后会通过接口绑定到 Spring。是否使用它完全由您作为应用程序开发人员或架构师决定。
请注意,SessionAwareMessageListener
接口的 onMessage(..)
方法会抛出 JMSException
。与标准 JMS MessageListener
接口不同,使用 SessionAwareMessageListener
接口时,客户端代码负责处理任何抛出的异常。
使用 MessageListenerAdapter
MessageListenerAdapter
类是 Spring 异步消息支持中的最后一个组件。简而言之,它允许您将几乎任何类公开为 MDP(尽管有一些限制)。
考虑以下接口定义
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
请注意,虽然该接口既不扩展 MessageListener
接口也不扩展 SessionAwareMessageListener
接口,但您仍然可以使用 MessageListenerAdapter
类将其用作 MDP。还要注意各种消息处理方法是如何根据它们可以接收和处理的各种 Message
类型的內容进行强类型化的。
现在考虑以下 MessageDelegate
接口的实现
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
特别要注意,前面的 MessageDelegate
接口实现(DefaultMessageDelegate
类)完全没有 JMS 依赖关系。它确实是一个 POJO,我们可以通过以下配置将其转换为 MDP
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例展示了另一个 MDP,它只能处理接收 JMS TextMessage
消息。请注意,消息处理方法实际上被称为 receive
(MessageListenerAdapter
中消息处理方法的名称默认为 handleMessage
),但它是可配置的(正如您将在本节后面看到的那样)。还要注意 receive(..)
方法是如何被强类型化为仅接收和响应 JMS TextMessage
消息的。以下清单展示了 TextMessageDelegate
接口的定义
public interface TextMessageDelegate {
void receive(TextMessage message);
}
以下清单展示了一个实现 TextMessageDelegate
接口的类
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
然后,相应的 MessageListenerAdapter
的配置如下
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果 messageListener
接收到的 JMS Message
的类型不是 TextMessage
,则会抛出 IllegalStateException
(并随后被吞掉)。MessageListenerAdapter
类的另一个功能是能够在处理程序方法返回非空值时自动发送响应 Message
。考虑以下接口和类
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
如果您将 DefaultResponsiveTextMessageDelegate
与 MessageListenerAdapter
结合使用,则从 'receive(..)'
方法执行返回的任何非空值(在默认配置中)都会被转换为 TextMessage
。然后,生成的 TextMessage
会被发送到原始 Message
的 JMS Reply-To
属性中定义的 Destination
(如果存在)或 MessageListenerAdapter
上设置的默认 Destination
(如果已配置)。如果找不到 Destination
,则会抛出 InvalidDestinationException
(请注意,此异常不会被吞掉,而是会向上传播到调用堆栈)。
在事务中处理消息
在事务中调用消息监听器只需要重新配置监听器容器。
您可以通过监听器容器定义中的sessionTransacted
标志激活本地资源事务。然后,每个消息监听器调用都在一个活动的 JMS 事务中运行,如果监听器执行失败,则回滚消息接收。发送响应消息(通过SessionAwareMessageListener
)是同一本地事务的一部分,但任何其他资源操作(例如数据库访问)都是独立运行的。这通常需要在监听器实现中进行重复消息检测,以涵盖数据库处理已提交但消息处理未能提交的情况。
考虑以下 bean 定义
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的事务,您需要配置一个事务管理器并使用支持外部管理事务的监听器容器(通常是DefaultMessageListenerContainer
)。
要配置消息监听器容器以参与 XA 事务,您需要配置一个JtaTransactionManager
(默认情况下,它委托给 Jakarta EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory
需要是 XA 兼容的,并且已正确注册到您的 JTA 事务协调器。(检查您的 Jakarta EE 服务器的 JNDI 资源配置。)这使得消息接收以及(例如)数据库访问成为同一事务的一部分(具有统一的提交语义,以牺牲 XA 事务日志开销为代价)。
以下 bean 定义创建了一个事务管理器
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到我们之前的容器配置中。容器会处理剩下的工作。以下示例展示了如何做到这一点
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/> (1)
</bean>
1 | 我们的事务管理器。 |