接收消息
本文介绍了如何在 Spring 中接收 JMS 消息。
同步接收
虽然 JMS 通常与异步处理相关联,但您可以同步使用消息。重载的receive(..)
方法提供了此功能。在同步接收期间,调用线程会阻塞,直到消息可用。这可能是一个危险的操作,因为调用线程可能会无限期地阻塞。receiveTimeout
属性指定接收器在放弃等待消息之前应等待多长时间。
异步接收:消息驱动的 POJO
Spring 还通过使用@JmsListener 注解支持带注解的监听器端点,并提供了一个开放的基础架构以编程方式注册端点。这迄今为止是设置异步接收器的最便捷方式。有关更多详细信息,请参阅启用监听器端点注解。 |
以类似于 EJB 世界中的消息驱动的 Bean (MDB) 的方式,消息驱动的 POJO (MDP) 充当 JMS 消息的接收器。MDP 的唯一限制(但请参阅使用MessageListenerAdapter
)是它必须实现jakarta.jms.MessageListener
接口。请注意,如果您的 POJO 在多个线程上接收消息,则务必确保您的实现是线程安全的。
以下示例显示了 MDP 的简单实现
-
Java
-
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
实现完MessageListener
后,就可以创建消息监听器容器了。
以下示例演示了如何定义和配置 Spring 附带的消息监听器容器之一(在本例中为DefaultMessageListenerContainer
)
-
Java
-
Kotlin
-
Xml
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
有关每个实现支持的功能的完整描述,请参阅 Spring 的各种消息监听器容器(所有这些容器都实现了MessageListenerContainer)的 Java 文档。
使用SessionAwareMessageListener
接口
SessionAwareMessageListener
接口是 Spring 特定的接口,它提供与 JMS MessageListener
接口类似的契约,但还允许消息处理方法访问接收Message
的 JMS Session
。以下列表显示了SessionAwareMessageListener
接口的定义
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果您希望 MDP 能够响应任何接收到的消息(通过在onMessage(Message, Session)
方法中使用提供的Session
),可以选择让您的 MDP 实现此接口(而不是标准 JMS MessageListener
接口)。Spring 附带的所有消息监听器容器实现都支持实现MessageListener
或SessionAwareMessageListener
接口的 MDP。实现SessionAwareMessageListener
的类存在一个警告,即它们随后通过接口绑定到 Spring。是否使用它完全由您作为应用程序开发人员或架构师决定。
请注意,SessionAwareMessageListener
接口的onMessage(..)
方法会抛出JMSException
。与标准 JMS MessageListener
接口相反,当使用SessionAwareMessageListener
接口时,客户端代码负责处理任何抛出的异常。
使用MessageListenerAdapter
MessageListenerAdapter
类是 Spring 异步消息支持中的最后一个组件。简而言之,它允许您将几乎任何类公开为 MDP(尽管有一些约束)。
考虑以下接口定义
-
Java
-
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
请注意,尽管该接口既没有扩展MessageListener
也没有扩展SessionAwareMessageListener
接口,但您仍然可以通过使用MessageListenerAdapter
类将其用作 MDP。还要注意各种消息处理方法是如何根据它们可以接收和处理的各种Message
类型的內容进行强类型化的。
现在考虑以下MessageDelegate
接口的实现
-
Java
-
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特别是,请注意MessageDelegate
接口(DefaultMessageDelegate
类)的前述实现根本没有任何 JMS 依赖项。它确实是一个 POJO,我们可以通过以下配置将其变成 MDP
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
下一个示例显示了另一个 MDP,它只能处理接收 JMS TextMessage
消息。请注意,消息处理方法实际上称为receive
(MessageListenerAdapter
中的消息处理方法的名称默认为handleMessage
),但它是可配置的(如您稍后在本节中所见)。还要注意receive(..)
方法是如何被强类型化为仅接收和响应 JMS TextMessage
消息的。以下列表显示了TextMessageDelegate
接口的定义
-
Java
-
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
以下列表显示了一个实现TextMessageDelegate
接口的类
-
Java
-
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
然后,相关的MessageListenerAdapter
的配置如下所示
-
Java
-
Kotlin
-
Xml
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果messageListener
接收到的 JMS Message
的类型不是TextMessage
,则会抛出IllegalStateException
(随后被吞并)。MessageListenerAdapter
类的另一个功能是,如果处理程序方法返回非空值,则能够自动发送回响应Message
。考虑以下接口和类
-
Java
-
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
-
Java
-
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
如果您将DefaultResponsiveTextMessageDelegate
与MessageListenerAdapter
结合使用,则从'receive(..)'
方法执行返回的任何非空值都会(在默认配置中)转换为TextMessage
。然后,生成的TextMessage
会发送到原始Message
的 JMS Reply-To
属性中定义的Destination
(如果存在),或者发送到MessageListenerAdapter
上设置的默认Destination
(如果已配置)。如果找不到任何Destination
,则会抛出InvalidDestinationException
(请注意,此异常不会被吞并,而是会向上传播到调用堆栈)。
在事务中处理消息
在事务中调用消息监听器只需要重新配置监听器容器。
您可以通过监听器容器定义上的sessionTransacted
标志激活本地资源事务。然后,每个消息监听器调用都在活动 JMS 事务中操作,如果监听器执行失败,则消息接收将回滚。发送响应消息(通过SessionAwareMessageListener
)是同一本地事务的一部分,但任何其他资源操作(例如数据库访问)都是独立操作的。这通常需要在监听器实现中进行重复消息检测,以涵盖数据库处理已提交但消息处理失败提交的情况。
考虑以下 Bean 定义
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
要参与外部管理的事务,您需要配置一个事务管理器并使用支持外部管理事务的监听器容器(通常是DefaultMessageListenerContainer
)。
要配置消息监听器容器以参与 XA 事务,您需要配置一个JtaTransactionManager
(默认情况下,它委托给 Jakarta EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory
需要支持 XA 并正确注册到您的 JTA 事务协调器。(检查您的 Jakarta EE 服务器的 JNDI 资源配置。)这使得消息接收以及(例如)数据库访问成为同一事务的一部分(具有统一的提交语义,但以 XA 事务日志开销为代价)。
以下 Bean 定义创建了一个事务管理器
-
Java
-
Kotlin
-
Xml
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后我们需要将其添加到我们之前的容器配置中。容器负责其余工作。以下示例展示了如何执行此操作
-
Java
-
Kotlin
-
Xml
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>