接收消息

本文档描述了如何在 Spring 中使用 JMS 接收消息。

同步接收

虽然 JMS 通常与异步处理相关联,但你也可以同步消费消息。JmsTemplateJmsClient 上的 receive(..) 方法提供了此功能。在同步接收期间,调用线程会阻塞,直到有消息可用。这可能是一个危险的操作,因为调用线程可能会无限期地阻塞。receiveTimeout 属性指定接收器在放弃等待消息之前应该等待多长时间。

异步接收:消息驱动 POJO

Spring 还通过使用 @JmsListener 注解支持注解监听器端点,并提供了开放的基础设施以编程方式注册端点。这是目前为止设置异步接收器最方便的方式。有关更多详细信息,请参阅启用监听器端点注解

与 EJB 世界中的消息驱动 Bean (MDB) 类似,消息驱动 POJO (MDP) 充当 JMS 消息的接收器。对 MDP 的一个限制(但请参阅使用 MessageListenerAdapter)是它必须实现 jakarta.jms.MessageListener 接口。请注意,如果你的 POJO 在多个线程上接收消息,那么确保你的实现是线程安全的非常重要。

以下示例展示了一个简单的 MDP 实现:

  • Java

  • Kotlin

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}
class ExampleListener : MessageListener {

	override fun onMessage(message: Message) {
		if (message is TextMessage) {
			try {
				println(message.text)
			} catch (ex: JMSException) {
				throw RuntimeException(ex)
			}
		} else {
			throw IllegalArgumentException("Message must be of type TextMessage")
		}
	}
}

一旦你实现了你的 MessageListener,就该创建消息监听器容器了。

以下示例展示了如何定义和配置 Spring 附带的一个消息监听器容器(在本例中为 DefaultMessageListenerContainer):

  • Java

  • Kotlin

  • Xml

@Bean
ExampleListener messageListener() {
	return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

请参阅各种消息监听器容器的 Spring javadoc(所有这些容器都实现了 MessageListenerContainer),以获取每个实现支持的功能的完整描述。

使用 SessionAwareMessageListener 接口

SessionAwareMessageListener 接口是 Spring 特有的接口,它提供了与 JMS MessageListener 接口类似的约定,但同时允许消息处理方法访问接收 Message 的 JMS Session。以下清单显示了 SessionAwareMessageListener 接口的定义:

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

如果你希望你的 MDP 能够响应任何收到的消息(通过使用 onMessage(Message, Session) 方法中提供的 Session),你可以选择让你的 MDP 实现此接口(优先于标准的 JMS MessageListener 接口)。Spring 附带的所有消息监听器容器实现都支持实现 MessageListenerSessionAwareMessageListener 接口的 MDP。实现 SessionAwareMessageListener 的类有一个警告,即它们通过接口与 Spring 绑定。是否使用它的选择完全取决于你作为应用程序开发人员或架构师。

请注意,SessionAwareMessageListener 接口的 onMessage(..) 方法抛出 JMSException。与标准 JMS MessageListener 接口不同,在使用 SessionAwareMessageListener 接口时,客户端代码有责任处理任何抛出的异常。

使用 MessageListenerAdapter

MessageListenerAdapter 类是 Spring 异步消息支持的最后一个组件。简而言之,它允许你将几乎任何类公开为 MDP(尽管有一些限制)。

考虑以下接口定义:

  • Java

  • Kotlin

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}
interface MessageDelegate {
	fun handleMessage(message: String)
	fun handleMessage(message: Map<*, *>)
	fun handleMessage(message: ByteArray)
	fun handleMessage(message: Serializable)
}

请注意,尽管该接口既不扩展 MessageListener 接口也不扩展 SessionAwareMessageListener 接口,但你仍然可以使用 MessageListenerAdapter 类将其用作 MDP。另请注意,各种消息处理方法如何根据它们可以接收和处理的各种 Message 类型的内容进行强类型化。

现在考虑 MessageDelegate 接口的以下实现:

  • Java

  • Kotlin

public class DefaultMessageDelegate implements MessageDelegate {

	@Override
	public void handleMessage(String message) {
		// ...
	}

	@Override
	public void handleMessage(Map message) {
		// ...
	}

	@Override
	public void handleMessage(byte[] message) {
		// ...
	}

	@Override
	public void handleMessage(Serializable message) {
		// ...
	}
}
class DefaultMessageDelegate : MessageDelegate {

	override fun handleMessage(message: String) {
		// ...
	}

	override fun handleMessage(message: Map<*, *>) {
		// ...
	}

	override fun handleMessage(message: ByteArray) {
		// ...
	}

	override fun handleMessage(message: Serializable) {
		// ...
	}
}

特别注意,MessageDelegate 接口的上述实现 (DefaultMessageDelegate 类) 根本没有任何 JMS 依赖。它确实是一个 POJO,我们可以通过以下配置将其转换为 MDP:

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
	return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
	return MessageListenerAdapter(messageDelegate)
}

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

下一个示例展示了另一个 MDP,它只能处理接收 JMS TextMessage 消息。请注意,消息处理方法实际上叫做 receiveMessageListenerAdapter 中消息处理方法的默认名称是 handleMessage),但它是可配置的(你可以在本节后面看到)。另请注意,receive(..) 方法是强类型化的,只能接收和响应 JMS TextMessage 消息。以下清单显示了 TextMessageDelegate 接口的定义:

  • Java

  • Kotlin

public interface TextMessageDelegate {

	void receive(TextMessage message);
}
interface TextMessageDelegate {
	fun receive(message: TextMessage)
}

以下清单显示了一个实现 TextMessageDelegate 接口的类:

  • Java

  • Kotlin

public class DefaultTextMessageDelegate implements TextMessageDelegate {

	@Override
	public void receive(TextMessage message) {
		// ...
	}
}
class DefaultTextMessageDelegate : TextMessageDelegate {

	override fun receive(message: TextMessage) {
		// ...
	}
}

伴随的 MessageListenerAdapter 的配置如下:

  • Java

  • Kotlin

  • Xml

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
	MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
	messageListener.setDefaultListenerMethod("receive");
	// We don't want automatic message context extraction
	messageListener.setMessageConverter(null);
	return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
	setDefaultListenerMethod("receive")
	// We don't want automatic message context extraction
	setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

请注意,如果 messageListener 接收到的 JMS Message 类型不是 TextMessage,则会抛出(并随后吞噬)IllegalStateExceptionMessageListenerAdapter 类的另一个功能是,如果处理程序方法返回非空值,则能够自动发送响应 Message。考虑以下接口和类:

  • Java

  • Kotlin

public interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	fun receive(message: TextMessage): String
}
  • Java

  • Kotlin

public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

	@Override
	public String receive(TextMessage message) {
		return "message";
	}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {

	override fun receive(message: TextMessage): String {
		return "message"
	}
}

如果您将 DefaultResponsiveTextMessageDelegateMessageListenerAdapter 结合使用,从 'receive(..)' 方法执行返回的任何非空值(在默认配置中)都会转换为 TextMessage。生成的 TextMessage 将被发送到原始 Message 的 JMS Reply-To 属性中定义的 Destination(如果存在),或者发送到 MessageListenerAdapter 上设置的默认 Destination(如果已配置)。如果未找到 Destination,则会抛出 InvalidDestinationException(请注意,此异常不会被吞噬并会沿着调用堆栈传播)。

在事务中处理消息

在事务中调用消息监听器只需要重新配置监听器容器即可。

您可以通过监听器容器定义上的 sessionTransacted 标志激活本地资源事务。然后,每个消息监听器调用都在一个活动的 JMS 事务中操作,如果监听器执行失败,消息接收将被回滚。发送响应消息(通过 SessionAwareMessageListener)是同一本地事务的一部分,但任何其他资源操作(例如数据库访问)都独立运行。这通常需要在监听器实现中进行重复消息检测,以涵盖数据库处理已提交但消息处理未能提交的情况。

考虑以下 bean 定义:

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		isSessionTransacted = true
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

要参与外部管理的事务,你需要配置一个事务管理器,并使用支持外部管理事务的监听器容器(通常是 DefaultMessageListenerContainer)。

要配置消息监听器容器以参与 XA 事务,你需要配置一个 JtaTransactionManager(它默认委托给 Jakarta EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory 需要支持 XA 并正确注册到你的 JTA 事务协调器。(检查你的 Jakarta EE 服务器的 JNDI 资源配置。)这使得消息接收以及(例如)数据库访问可以成为同一事务的一部分(具有统一的提交语义,但以 XA 事务日志开销为代价)。

以下 bean 定义创建了一个事务管理器:

  • Java

  • Kotlin

  • Xml

@Bean
JtaTransactionManager transactionManager()  {
	return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后,我们需要将其添加到我们之前的容器配置中。容器会处理其余的事情。以下示例展示了如何操作:

  • Java

  • Kotlin

  • Xml

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
				 transactionManager: JtaTransactionManager) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		setTransactionManager(transactionManager)
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>
© . This site is unofficial and not affiliated with VMware.