Apache Camel 支持

Spring Integration 提供了一个 API 和配置,用于与在同一应用程序上下文中声明的 Apache Camel 端点进行通信。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-camel</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-camel:6.3.5"

Spring Integration 和 Apache Camel 实现企业集成模式,并提供了一种方便的方式来组合它们,但这两个项目在其 API 和抽象实现方面采用了不同的方法。Spring Integration 完全依赖于 Spring Core 的依赖注入容器。它使用许多其他 Spring 项目(Spring Data、Spring AMQP、Spring for Apache Kafka 等)来实现其通道适配器。它还使用 MessageChannel 抽象作为一等公民,开发人员在组合其集成流程时需要了解这一点。另一方面,Apache Camel 并没有提供消息通道的一等公民抽象,而是建议通过 API 中隐藏的内部交换来组合其路由。此外,它需要一些额外的 依赖项和配置才能在 Spring 应用程序中使用。

即使最终的企业集成解决方案如何实现其各个部分并不重要,开发人员体验和高生产力也需要考虑在内。因此,开发人员可能出于多种原因选择一个框架而不是另一个框架,或者如果某些目标系统支持存在差距,则同时选择两者。Spring Integration 和 Apache Camel 应用程序可以通过它们实现通道适配器的许多外部协议相互交互。例如,Spring Integration 流程可以将记录发布到 Apache Kafka 主题,该主题由消费者端的 Apache Camel 端点使用。或者,Apache Camel 路由可以将数据写入 SFTP 文件目录,该目录由 Spring Integration 的 SFTP 入站通道适配器轮询。或者,在同一个 Spring 应用程序上下文中,它们可以通过 ApplicationEvent 抽象进行通信。

为了使开发过程更容易,并避免不必要的网络跳转,Apache Camel 提供了一个 模块,通过消息通道与 Spring Integration 通信。所有需要做的就是从应用程序上下文中引用一个 MessageChannel,以发送或使用消息。当 Apache Camel 路由是消息流的发起者并且 Spring Integration 仅作为解决方案的一部分发挥辅助作用时,这非常有效。

为了获得类似的开发人员体验,Spring Integration 现在提供了一个通道适配器来调用 Apache Camel 端点,并且可以选择等待回复。没有入站通道适配器,因为从 Spring Integration API 和抽象的角度来看,订阅 MessageChannel 以使用 Apache Camel 消息就足够了。

Apache Camel 的出站通道适配器

CamelMessageHandlerAbstractReplyProducingMessageHandler 的实现,可以在单向(默认)和请求-回复模式下工作。它使用 org.apache.camel.ProducerTemplate 将(或发送和接收)消息发送到 org.apache.camel.Endpoint。交互模式可以通过 ExchangePattern 选项控制(可以通过 SpEL 表达式在运行时针对请求消息进行评估)。目标 Apache Camel 端点可以显式配置或作为在运行时计算的 SpEL 表达式。否则,它将回退到 ProducerTemplate 上提供的 defaultEndpoint。而不是指定端点,可以提供内联的、显式的 LambdaRouteBuilder,例如,对 Spring Integration 中没有通道适配器支持的 Apache Camel 组件进行调用。

此外,可以提供 HeaderMapper<org.apache.camel.Message>CamelHeaderMapper 是默认实现),以确定要映射 Spring Integration 和 Apache Camel 消息之间的哪些标头。默认情况下,所有标头都会被映射。

CamelMessageHandler 支持异步模式调用 ProducerTemplate.asyncSend() 并为回复处理(如果有)生成 CompletableFuture

可以通过 SpEL 表达式自定义 exchangeProperties,该表达式必须计算为 Map

如果没有提供 ProducerTemplate,则会通过从应用程序上下文中解析的 CamelContext bean 创建它。

@Bean
@ServiceActivator(inputChannel = "sendToCamel")
CamelMessageHandler camelService(ProducerTemplate producerTemplate) {
    CamelHeaderMapper headerMapper = new CamelHeaderMapper();
    headerMapper.setOutboundHeaderNames("");
    headerMapper.setInboundHeaderNames("testHeader");

    CamelMessageHandler camelMessageHandler = new CamelMessageHandler(producerTemplate);
    camelMessageHandler.setEndpointUri("direct:simple");
    camelMessageHandler.setExchangePatternExpression(spelExpressionParser.parseExpression("headers.exchangePattern"));
    camelMessageHandler.setHeaderMapper(headerMapper);
    return camelMessageHandler;
}

对于 Java DSL 流定义,可以使用 Camel 工厂提供的几种变体来配置此通道适配器

@Bean
IntegrationFlow camelFlow() {
    return f -> f
            .handle(Camel.gateway().endpointUri("direct:simple"))
            .handle(Camel.route(this::camelRoute))
            .handle(Camel.handler().endpointUri("log:com.mycompany.order?level=WARN"));
}

private void camelRoute(RouteBuilder routeBuilder) {
    routeBuilder.from("direct:inbound").transform(routeBuilder.simple("${body.toUpperCase()}"));
}