MQTT 支持

Spring Integration 提供入站和出站通道适配器来支持消息队列遥测传输 (MQTT) 协议。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.3.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.3.0"

当前实现使用 Eclipse Paho MQTT 客户端 库。

XML 配置和本章的大部分内容都与 MQTT v3.1 协议支持和相应的 Paho 客户端有关。有关相应协议支持,请参阅 MQTT v5 支持 段落。

两种适配器的配置都是通过 DefaultMqttPahoClientFactory 实现的。有关配置选项的更多信息,请参阅 Paho 文档。

我们建议配置一个 MqttConnectOptions 对象并将其注入到工厂中,而不是在工厂本身设置(已弃用)选项。

入站(消息驱动)通道适配器

入站通道适配器由 MqttPahoMessageDrivenChannelAdapter 实现。为了方便起见,您可以使用命名空间来配置它。一个最小的配置可能如下所示

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

以下列表显示了可用的属性

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://127.0.0.1:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 客户端 ID。
2 代理 URL。
3 此适配器接收消息的主题的逗号分隔列表。
4 一个用逗号分隔的 QoS 值列表。它可以是一个应用于所有主题的单个值,也可以是每个主题的值(在这种情况下,列表的长度必须相同)。
5 一个 MqttMessageConverter(可选)。默认情况下,默认的 DefaultPahoMessageConverter 会生成一个带有 String 负载的消息,并包含以下头信息
  • mqtt_topic:接收消息的主题

  • mqtt_duplicate:如果消息是重复的,则为 true

  • mqtt_qos:服务质量。您可以通过将 DefaultPahoMessageConverter 声明为 <bean/> 并将 payloadAsBytes 属性设置为 true 来配置它返回负载中的原始 byte[]

6 客户端工厂。
7 send() 超时时间。它仅在通道可能阻塞时(例如,当前已满的 QueueChannel)才适用。
8 错误通道。如果提供,下游异常将以 ErrorMessage 的形式发送到此通道。负载是一个 MessagingException,它包含失败的消息和原因。
9 恢复间隔。它控制适配器在失败后尝试重新连接的间隔。默认值为 10000ms(十秒)。
10 确认模式;对于手动确认,将其设置为 true。
从 4.1 版本开始,您可以省略 URL。相反,您可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。这样做可以实现例如连接到高可用性 (HA) 集群。

从 4.2.2 版本开始,当适配器成功订阅主题时,会发布 MqttSubscribedEvent。当连接或订阅失败时,会发布 MqttConnectionFailedEvent 事件。这些事件可以由实现 ApplicationListener 的 bean 接收。

此外,一个名为 recoveryInterval 的新属性控制适配器在失败后尝试重新连接的间隔。默认值为 10000ms(十秒)。

在 4.2.3 版本之前,客户端在适配器停止时总是取消订阅。这是不正确的,因为如果客户端 QOS 大于 0,我们需要保持订阅活动,以便在适配器停止时到达的消息在下次启动时被传递。这也要求在客户端工厂上将 cleanSession 属性设置为 false。默认值为 true

从 4.2.3 版本开始,如果 `cleanSession` 属性为 `false`,适配器默认不会取消订阅。

可以通过在工厂上设置 `consumerCloseAction` 属性来覆盖此行为。它可以取以下值:`UNSUBSCRIBE_ALWAYS`、`UNSUBSCRIBE_NEVER` 和 `UNSUBSCRIBE_CLEAN`。最后一个值(默认值)仅在 `cleanSession` 属性为 `true` 时取消订阅。

要恢复到 4.2.3 版本之前的行为,请使用 `UNSUBSCRIBE_ALWAYS`。

从 5.0 版本开始,`topic`、`qos` 和 `retained` 属性被映射到 `.RECEIVED_…​` 头部(`MqttHeaders.RECEIVED_TOPIC`、`MqttHeaders.RECEIVED_QOS` 和 `MqttHeaders.RECEIVED_RETAINED`),以避免无意中传播到默认使用 `MqttHeaders.TOPIC`、`MqttHeaders.QOS` 和 `MqttHeaders.RETAINED` 头部的出站消息。

运行时添加和删除主题

从 4.1 版本开始,您可以以编程方式更改适配器订阅的主题。Spring Integration 提供了 `addTopic()` 和 `removeTopic()` 方法。添加主题时,您可以选择指定 `QoS`(默认值为 1)。您也可以通过向 `<control-bus/>` 发送带有适当有效负载的消息来修改主题,例如:`“myMqttAdapter.addTopic('foo', 1)”`。

停止和启动适配器不会影响主题列表(它不会恢复到配置中的原始设置)。这些更改不会在应用程序上下文的生命周期之外保留。新的应用程序上下文将恢复到配置的设置。

在适配器停止(或断开与代理的连接)时更改主题将在下次建立连接时生效。

手动确认

从 5.3 版本开始,您可以将 `manualAcks` 属性设置为 true。通常用于异步确认交付。当设置为 `true` 时,头部(`IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK`)将添加到消息中,其值为 `SimpleAcknowledgment`。您必须调用 `acknowledge()` 方法来完成交付。有关更多信息,请参阅 `IMqttClient` 的 `setManualAcks()` 和 `messageArrivedComplete()` 的 Javadoc 文档。为了方便起见,提供了一个头部访问器

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

从版本5.2.11开始,当消息转换器抛出异常或从MqttMessage转换返回null时,MqttPahoMessageDrivenChannelAdapter会将一个ErrorMessage发送到errorChannel(如果提供)。否则,将此转换错误重新抛出到 MQTT 客户端回调中。

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置配置入站适配器。

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了一个使用 Java DSL 配置入站适配器的示例。

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://127.0.0.1:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

出站通道适配器

出站通道适配器由MqttPahoMessageHandler实现,该适配器被包装在一个ConsumerEndpoint中。为了方便起见,您可以使用命名空间对其进行配置。

从版本 4.1 开始,适配器支持异步发送操作,避免阻塞直到确认交付。您可以发出应用程序事件以使应用程序能够在需要时确认交付。

以下列表显示了出站通道适配器可用的属性。

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://127.0.0.1:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 客户端 ID。
2 代理 URL。
3 一个MqttMessageConverter(可选)。默认的DefaultPahoMessageConverter识别以下标头。
  • mqtt_topic:消息将发送到的主题。

  • mqtt_retained:如果消息要保留,则为true

  • mqtt_qos:服务质量。

4 客户端工厂。
5 默认的服务质量。如果未找到mqtt_qos标头或qos-expression返回null,则使用它。如果您提供自定义converter,则不会使用它。
6 用于评估以确定 qos 的表达式。默认值为headers[mqtt_qos]
7 保留标志的默认值。如果未找到mqtt_retained标头,则使用它。如果提供了自定义converter,则不会使用它。
8 用于评估以确定保留布尔值的表达式。默认值为headers[mqtt_retained]
9 消息发送到的默认主题(如果未找到mqtt_topic标头,则使用)。
10 用于评估以确定目标主题的表达式。默认值为headers['mqtt_topic']
11 当为true时,调用者不会阻塞。相反,它会在发送消息时等待确认交付。默认值为false(发送会阻塞直到确认交付)。
12 asyncasync-events 都为 true 时,会发出一个 MqttMessageSentEvent 事件(参见 事件)。它包含消息、主题、客户端库生成的 messageIdclientIdclientInstance(每次客户端连接时都会递增)。当客户端库确认消息传递时,会发出一个 MqttMessageDeliveredEvent 事件。它包含 messageIdclientIdclientInstance,使消息传递能够与 send() 相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,MqttMessageDeliveredEvent 可能在 MqttMessageSentEvent 之前收到。默认值为 false
从 4.1 版本开始,可以省略 URL。相反,可以在 DefaultMqttPahoClientFactoryserverURIs 属性中提供服务器 URI。这使得例如连接到高可用性 (HA) 集群成为可能。

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置配置出站适配器的示例

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序提供了使用 Java DSL 配置出站适配器的示例

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

事件

适配器会发布某些应用程序事件。

  • MqttConnectionFailedEvent - 如果我们无法连接或连接随后丢失,则由两个适配器发布。对于 MQTT v5 Paho 客户端,当服务器执行正常断开连接时,也会发出此事件,在这种情况下,丢失连接的 causenull

  • MqttMessageSentEvent - 如果在异步模式下运行,则由出站适配器在消息发送时发布。

  • MqttMessageDeliveredEvent - 如果在异步模式下运行,则由出站适配器在客户端指示消息已传递时发布。

  • MqttSubscribedEvent - 在订阅主题后由入站适配器发布。

这些事件可以通过 ApplicationListener<MqttIntegrationEvent> 或使用 @EventListener 方法接收。

要确定事件的来源,请使用以下方法;您可以检查 bean 名称和/或连接选项(以访问服务器 URI 等)。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTT v5 支持

从 5.5.5 版本开始,spring-integration-mqtt 模块为 MQTT v5 协议提供通道适配器实现。org.eclipse.paho:org.eclipse.paho.mqttv5.client 是一个 可选 依赖项,因此必须在目标项目中显式包含。

由于 MQTT v5 协议支持 MQTT 消息中的额外任意属性,因此引入了 MqttHeaderMapper 实现来映射发布和接收操作的标头。默认情况下(通过 * 模式),它映射所有接收到的 PUBLISH 帧属性(包括用户属性)。在出站方面,它为 PUBLISH 帧映射以下标头子集:contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData

MQTT v5 协议的出站通道适配器以 Mqttv5PahoMessageHandler 的形式存在。它需要一个 clientId 和 MQTT 代理 URL 或 MqttConnectionOptions 引用。它支持 MqttClientPersistence 选项,可以是 异步 的,在这种情况下可以发出 MqttIntegrationEvent 对象(参见 asyncEvents 选项)。如果请求消息有效负载是 org.eclipse.paho.mqttv5.common.MqttMessage,则通过内部 IMqttAsyncClient 按原样发布。如果有效负载是 byte[],则按原样用于目标 MqttMessage 有效负载以发布。如果有效负载是 String,则将其转换为 byte[] 以发布。其余用例委托给提供的 MessageConverter,它来自应用程序上下文的 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME ConfigurableCompositeMessageConverter bean。注意:当请求的消息有效负载已经是 MqttMessage 时,不会使用提供的 HeaderMapper<MqttProperties>。以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter 不能与 Mqttv5PahoMessageHandler 一起使用,因为它的契约仅针对 MQTT v3 协议。

如果连接在启动时或运行时失败,Mqttv5PahoMessageHandler 会尝试在发送到此处理程序的下一条消息时重新连接。如果此手动重新连接失败,则连接异常将抛回给调用者。在这种情况下,将应用标准的 Spring Integration 错误处理过程,包括请求处理程序建议,例如重试或断路器。

有关更多信息,请参阅 Mqttv5PahoMessageHandler javadocs 及其超类。

用于 MQTT v5 协议的入站通道适配器以 `Mqttv5PahoMessageDrivenChannelAdapter` 的形式存在。它需要一个 `clientId` 和 MQTT 代理 URL 或 `MqttConnectionOptions` 引用,以及要订阅和消费的主题。它支持 `MqttClientPersistence` 选项,默认情况下为内存中持久化。可以配置预期的 `payloadType`(默认情况下为 `byte[]`),并将其传播到提供的 `SmartMessageConverter` 以将接收到的 `MqttMessage` 的 `byte[]` 转换为其他类型。如果设置了 `manualAck` 选项,则会将 `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` 头添加到消息中,以 `SimpleAcknowledgment` 的实例形式生成。`HeaderMapper` 用于将 `PUBLISH` 帧属性(包括用户属性)映射到目标消息头。标准的 `MqttMessage` 属性,如 `qos`、`id`、`dup`、`retained` 以及接收到的主题始终映射到头。有关更多信息,请参阅 `MqttHeaders`。

从 6.3 版本开始,`Mqttv5PahoMessageDrivenChannelAdapter` 提供了基于 `MqttSubscription` 的构造函数,用于进行细粒度的配置,而不是使用简单的主题名称。当提供这些订阅时,通道适配器的 `qos` 选项将不可用,因为这种 `qos` 模式是 `MqttSubscription` API 的一部分。

以下 Java DSL 配置示例演示了如何在集成流中使用此通道适配器。

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
由于 `org.springframework.integration.mqtt.support.MqttMessageConverter` 的契约仅针对 MQTT v3 协议,因此无法与 `Mqttv5PahoMessageDrivenChannelAdapter` 一起使用。

有关更多信息,请参阅 `Mqttv5PahoMessageDrivenChannelAdapter` 的 javadoc 文档及其超类。

建议将 `MqttConnectionOptions#setAutomaticReconnect(boolean)` 设置为 true,以让内部 `IMqttAsyncClient` 实例处理重新连接。否则,只有手动重启 `Mqttv5PahoMessageDrivenChannelAdapter` 才能处理重新连接,例如通过断开连接时的 `MqttConnectionFailedEvent` 处理。

共享 MQTT 客户端支持

如果多个集成需要单个 MQTT ClientID,则无法使用多个 MQTT 客户端实例,因为 MQTT 代理可能对每个 ClientID 的连接数量有限制(通常只允许单个连接)。为了让单个客户端被不同的通道适配器重用,可以使用 `org.springframework.integration.mqtt.core.ClientManager` 组件并将其传递给任何需要的通道适配器。它将管理 MQTT 连接生命周期,并在需要时进行自动重新连接。此外,可以向客户端管理器提供自定义连接选项和 `MqttClientPersistence`,就像目前可以为通道适配器组件执行的那样。

请注意,MQTT v5 和 v3 通道适配器都受支持。

以下 Java DSL 配置示例演示了如何在集成流中使用此客户端管理器

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://127.0.0.1:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}