生产和消费消息

您可以通过简单地编写函数并将其公开为 @Bean 来编写 Spring Cloud Stream 应用。您也可以使用基于 Spring Integration 注解或 Spring Cloud Stream 注解的配置,尽管从 spring-cloud-stream 3.x 开始,我们推荐使用函数式实现。

Spring Cloud Function 支持

概览

从 Spring Cloud Stream v2.1 开始,定义 流处理器 的另一种选择是使用对 Spring Cloud Function 的内置支持,在其中可以将它们表达为 `java.util.function.[Supplier/Function/Consumer]` 类型的 Bean。

要指定将哪个函数式 Bean 绑定到由绑定公开的外部目标,您必须提供 spring.cloud.function.definition 属性。

如果您只有一个 java.util.function.[Supplier/Function/Consumer] 类型的 Bean,则可以跳过 spring.cloud.function.definition 属性,因为这样的函数式 Bean 将被自动发现。然而,最佳实践是使用此属性以避免混淆。有时这种自动发现会带来麻烦,因为可能存在一个 java.util.function.[Supplier/Function/Consumer] 类型的 Bean,其目的不是处理消息,但由于是单个 Bean,它会被自动发现和自动绑定。对于这些罕见情况,您可以通过将 spring.cloud.stream.function.autodetect 属性设置为 false 来禁用自动发现。

这里是一个将消息处理器公开为 java.util.function.Function 的应用示例,通过充当数据的消费者和生产者,有效地支持 直通 语义。

@SpringBootApplication
public class MyFunctionBootApp {

	public static void main(String[] args) {
		SpringApplication.run(MyFunctionBootApp.class);
	}

	@Bean
	public Function<String, String> toUpperCase() {
		return s -> s.toUpperCase();
	}
}

在上面的示例中,我们定义了一个 java.util.function.Function 类型的 Bean,名为 toUpperCase,作为消息处理器,其“输入”和“输出”必须绑定到由所提供目标 Binder 公开的外部目标。默认情况下,“输入”和“输出”绑定名称将是 toUpperCase-in-0toUpperCase-out-0。有关用于建立绑定名称的命名约定的详细信息,请参阅 函数式绑定名称 部分。

下面是支持其他语义的简单函数式应用示例

这里是一个将 语义公开为 java.util.function.Supplier 的示例

@SpringBootApplication
public static class SourceFromSupplier {

	@Bean
	public Supplier<Date> date() {
		return () -> new Date(12345L);
	}
}

这里是一个将 sink 语义 公开为 java.util.function.Consumer 的示例

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Consumer<String> sink() {
		return System.out::println;
	}
}

Supplier (源)

FunctionConsumer 在如何触发其调用方面非常直接。它们根据发送到它们所绑定目标的 数据 (事件) 来触发。换句话说,它们是经典的事件驱动组件。

然而,Supplier 在触发方面属于自己的类别。因为它根据定义是数据的源头,所以它不订阅任何入站目标,因此必须由其他机制触发。还有一个关于 Supplier 实现的问题,它可能是 命令式响应式 的,这直接关系到此类 Supplier 的触发方式。

考虑以下示例

@SpringBootApplication
public static class SupplierConfiguration {

	@Bean
	public Supplier<String> stringSupplier() {
		return () -> "Hello from Supplier";
	}
}

上述 Supplier Bean 在其 get() 方法被调用时会产生一个字符串。然而,谁以及多久调用一次此方法?框架提供了一个默认的轮询机制 (回答“谁?”的问题),它将触发 Supplier 的调用,默认情况下每秒触发一次 (回答“多久一次?”的问题)。换句话说,上述配置每秒生成一条消息,每条消息都被发送到由 Binder 公开的 output 目标。要了解如何自定义轮询机制,请参阅 轮询配置属性 部分。

考虑另一个示例

@SpringBootApplication
public static class SupplierConfiguration {

    @Bean
    public Supplier<Flux<String>> stringSupplier() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }
}

上述 Supplier Bean 采用响应式编程风格。通常情况下,与命令式 Supplier 不同,它应该只被触发一次,因为调用其 get() 方法会产生 (提供) 连续的消息流而不是单个消息。

框架识别出编程风格的差异,并保证此类 Supplier 只被触发一次。

然而,设想一个用例,您想要轮询某个数据源并返回一个表示结果集的有限数据流。响应式编程风格是此类 Supplier 的完美机制。然而,考虑到生成的数据流是有限的,此类 Supplier 仍然需要定期调用。

考虑以下示例,它通过产生有限数据流来模拟此类用例

@SpringBootApplication
public static class SupplierConfiguration {

	@PollableBean
	public Supplier<Flux<String>> stringSupplier() {
		return () -> Flux.just("hello", "bye");
	}
}

Bean 本身使用 PollableBean 注解进行标注 (它是 @Bean 的子集),从而向框架发出信号,表明尽管此类 Supplier 的实现是响应式的,但它仍然需要被轮询。

PollableBean 中定义了一个 splittable 属性,它向此注解的后处理器发出信号,表明被标注组件产生的结果需要拆分,并且默认设置为 true。这意味着框架将拆分返回的结果并将每个项作为单独的消息发送出去。如果这不是期望的行为,您可以将其设置为 false,此时此类 Supplier 将简单地返回生成的 Flux 而不进行拆分。

Supplier 与线程

正如您目前所了解的,与由事件触发 (FunctionConsumer 有输入数据) 的情况不同,Supplier 没有任何输入,因此由不同的机制触发 - 轮询器,它可能具有不可预测的线程机制。虽然线程机制的细节在大多数情况下与函数的下游执行无关,但在某些情况下可能会出现问题,特别是在集成的框架中,它们可能对线程亲和性有特定的要求。例如,依赖于存储在线程局部变量中的跟踪数据的 Spring Cloud Sleuth。对于这些情况,我们通过 StreamBridge 提供了另一种机制,用户可以更精细地控制线程机制。您可以在 将任意数据发送到输出 (例如:外部事件驱动源) 部分获取更多详细信息。

消费者 (响应式)

响应式 Consumer 有点特别,因为它具有 void 返回类型,框架无法获取订阅引用。您很可能不需要编写 Consumer<Flux<?>>,而是将其写为 Function<Flux<?>, Mono<Void>>,并在流的最后一个操作符上调用 then 操作符。

例如

public Function<Flux<?>, Mono<Void>> consumer() {
	return flux -> flux.map(..).filter(..).then();
}

但是如果您确实需要编写显式的 Consumer<Flux<?>>,请记住订阅传入的 Flux。

此外,请记住,当混合使用响应式和命令式函数时,同样的规则适用于函数组合。Spring Cloud Function 确实支持将响应式函数与命令式函数组合,但您必须注意某些限制。例如,假设您将响应式函数与命令式消费者组合。这种组合的结果是一个响应式 Consumer。然而,如本节前面所述,没有办法订阅这样的消费者,因此此限制只能通过将您的消费者变为响应式并手动订阅 (如前所述),或者将您的函数更改为命令式来解决。

轮询配置属性

以下属性由 Spring Cloud Stream 公开,并带有前缀 spring.integration.poller.

fixedDelay

默认轮询器的固定延迟,单位为毫秒。

默认值:1000L。

maxMessagesPerPoll

默认轮询器每次轮询事件的最大消息数。

默认值:1L。

cron

Cron Trigger 的 Cron 表达式值。

默认值:无。

initialDelay

周期性触发器的初始延迟。

默认值:0。

timeUnit

应用于延迟值的 TimeUnit。

默认值:MILLISECONDS。

例如 --spring.integration.poller.fixed-delay=2000 将轮询器的间隔设置为每两秒轮询一次。

按绑定配置轮询

前一节展示了如何配置应用于所有绑定的单个默认轮询器。虽然这非常适合 spring-cloud-stream 设计的微服务模型(其中每个微服务代表一个组件,例如 Supplier,因此默认的轮询配置就足够了),但也存在一些边缘情况,您可能有多个组件需要不同的轮询配置

对于这种情况,请使用按绑定配置轮询器的方式。例如,假设您有一个输出绑定 supply-out-0。在这种情况下,您可以使用 spring.cloud.stream.bindings.supply-out-0.producer.poller.. 前缀为此绑定配置轮询器 (例如,spring.cloud.stream.bindings.supply-out-0.producer.poller.fixed-delay=2000)。

将任意数据发送到输出 (例如:外部事件驱动源)

在某些情况下,实际的数据源可能来自不是 Binder 的外部(外部)系统。例如,数据源可能是一个经典的 REST 端点。我们如何将此类源与 spring-cloud-stream 使用的函数式机制桥接起来?

Spring Cloud Stream 提供了两种机制,我们来详细了解一下

在此,对于这两个示例,我们将使用绑定到根 Web 上下文的标准 MVC 端点方法 delegateToSupplier,通过 StreamBridge 机制将传入请求委托给流。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, "--spring.cloud.stream.output-bindings=toStream");
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("toStream", body);
	}
}

这里我们自动注入了一个 StreamBridge Bean,它允许我们将数据发送到输出绑定,有效地将非流应用与 spring-cloud-stream 桥接起来。请注意,前面的示例没有定义任何源函数 (例如 Supplier Bean),框架因此没有提前创建源绑定的触发器,这在配置包含函数 Bean 的情况下很常见。但这没关系,因为 StreamBridge 会在第一次调用其 send(..) 操作时,为不存在的绑定启动输出绑定的创建 (以及必要时的目标自动配置),并缓存起来供后续重用 (有关详细信息,请参阅 StreamBridge 和动态目标)。

然而,如果您想在初始化 (启动) 时预先创建输出绑定,则可以利用 spring.cloud.stream.output-bindings 属性,在该属性中声明源的名称。提供的名称将用作创建源绑定的触发器。您可以使用 ; 来表示多个源 (多个输出绑定) (例如,--spring.cloud.stream.output-bindings=foo;bar)

此外,请注意 streamBridge.send(..) 方法接受一个 Object 作为数据。这意味着您可以发送 POJO 或 Message,并且在发送输出时会经历与任何 Function 或 Supplier 发送时相同的过程,提供与函数相同的一致性级别。这意味着输出类型转换、分区等都将被遵守,就如同它们来自函数产生的输出一样。

StreamBridge 的异步发送

StreamBridge 使用由 Spring Integration 框架提供的发送机制,该机制是 Spring Cloud Stream 的核心。默认情况下,此机制使用发送方的线程。换句话说,发送是阻塞的。虽然这在许多情况下是可以接受的,但有些情况下您希望发送是异步的。为此,在使用 StreamBridge 调用发送方法之前,请使用其 setAsync(true) 方法。

使用异步发送进行可观测性上下文传播

使用框架提供的可观测性支持以及配套的 Spring 框架时,打破线程边界会影响可观测性上下文的一致性,进而影响您的追踪历史。为了避免这种情况,您只需添加 Micrometer 的 context-propagation 依赖项 (见下文)

<dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>context-propagation</artifactId>
    <version>1.1.0</version>
</dependency>

StreamBridge 和动态目标

StreamBridge 也可用于提前不知道输出目标的情况,类似于 Routing FROM consumer 部分描述的用例。

我们来看一个示例

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class, args);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		System.out.println("Sending " + body);
		streamBridge.send("myDestination", body);
	}
}

如您所见,前面的示例与之前的示例非常相似,不同之处在于没有通过 spring.cloud.stream.output-bindings 属性提供显式绑定指令。在这里,我们将数据发送到名为 myDestination 的目标,该目标不存在绑定。因此,该名称将被视为动态目标,如 Routing FROM consumer 部分所述。

在前面的示例中,我们使用 ApplicationRunner 作为 *外部源* 来馈送流。

一个更实际的示例,其中外部源是 REST 端点。

@SpringBootApplication
@Controller
public class WebSourceApplication {

	public static void main(String[] args) {
		SpringApplication.run(WebSourceApplication.class);
	}

	@Autowired
	private StreamBridge streamBridge;

	@RequestMapping
	@ResponseStatus(HttpStatus.ACCEPTED)
	public void delegateToSupplier(@RequestBody String body) {
		streamBridge.send("myBinding", body);
	}
}

如您所见,在 delegateToSupplier 方法内部,我们使用 StreamBridge 将数据发送到 myBinding 绑定。在这里,您也受益于 StreamBridge 的动态特性,如果 myBinding 不存在,它将自动创建并缓存,否则将使用现有绑定。

缓存动态目标 (绑定) 可能会导致内存泄漏,尤其是在存在许多动态目标的情况下。为了有一定的控制,我们为输出绑定提供了一个默认缓存大小为 10 的自淘汰缓存机制。这意味着如果您的动态目标数量超过此数字,则可能存在现有绑定被淘汰,从而需要重新创建,这可能会导致轻微的性能下降。您可以通过 spring.cloud.stream.dynamic-destination-cache-size 属性将其设置为所需值来增加缓存大小。
curl -H "Content-Type: text/plain" -X POST -d "hello from the other side" https://:8080/

通过展示两个示例,我们想强调这种方法适用于任何类型的外部源。

如果您使用 Solace PubSub+ Binder,Spring Cloud Stream 保留了 scst_targetDestination 头 (可通过 BinderHeaders.TARGET_DESTINATION 获取),该头允许消息从其绑定配置的目标重定向到此头指定的目标。这使得 Binder 可以管理发布到动态目标所需的资源,减轻了框架的负担,并避免了前面 Note 中提到的缓存问题。更多信息请见此处

使用 StreamBridge 的输出内容类型

如果需要,您也可以使用以下方法签名提供特定的内容类型 public boolean send(String bindingName, Object data, MimeType outputContentType)。或者,如果您将数据作为 Message 发送,其内容类型将被遵守。

使用 StreamBridge 指定 Binder 类型

Spring Cloud Stream 支持多种 Binder 场景。例如,您可能从 Kafka 接收数据并将其发送到 RabbitMQ。

有关多 Binder 场景的更多信息,请参阅 Binder 部分,特别是 Classpath 上的多个 Binder

如果您打算使用 StreamBridge 并且在您的应用中配置了不止一个 Binder,您还必须告诉 StreamBridge 使用哪个 Binder。为此,send 方法还有另外两种变体

public boolean send(String bindingName, @Nullable String binderType, Object data)

public boolean send(String bindingName, @Nullable String binderType, Object data, MimeType outputContentType)

如您所见,您可以提供一个附加参数 - binderType,它告诉 BindingService 在创建动态绑定时使用哪个 Binder。

对于使用 spring.cloud.stream.output-bindings 属性的情况,或者绑定已经在不同的 Binder 下创建,binderType 参数将不起作用。

使用 StreamBridge 的通道拦截器

由于 StreamBridge 使用 MessageChannel 来建立输出绑定,因此在使用 StreamBridge 发送数据时可以激活通道拦截器。由应用决定在 StreamBridge 上应用哪些通道拦截器。Spring Cloud Stream 不会将所有检测到的通道拦截器注入到 StreamBridge 中,除非它们被 @GlobalChannelInterceptor(patterns = "*") 注解。

假设您的应用中有以下两个不同的 StreamBridge 绑定。

streamBridge.send("foo-out-0", message);

streamBridge.send("bar-out-0", message);

现在,如果您想将通道拦截器应用于这两个 StreamBridge 绑定,则可以声明以下 GlobalChannelInterceptor Bean。

@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

但是,如果您不喜欢上面的全局方法,并希望为每个绑定设置专用的拦截器,则可以执行以下操作。

@Bean
@GlobalChannelInterceptor(patterns = "foo-*")
public ChannelInterceptor fooInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

@Bean
@GlobalChannelInterceptor(patterns = "bar-*")
public ChannelInterceptor barInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            ...
        }
    };
}

您可以灵活地使模式更严格或根据您的业务需求进行定制。

通过这种方法,应用可以决定向 StreamBridge 注入哪些拦截器,而不是应用所有可用的拦截器。

StreamBridge 通过 StreamOperations 接口提供了一个契约,该接口包含 StreamBridge 的所有 send 方法。因此,应用可以选择使用 StreamOperations 进行自动注入。这对于单元测试使用 StreamBridge 的代码非常方便,可以通过为 StreamOperations 接口提供模拟或其他类似机制来实现。

响应式函数支持

由于 Spring Cloud Function 是基于 Project Reactor 构建的,因此在实现 SupplierFunctionConsumer 时,您不需要做太多事情即可受益于响应式编程模型。

例如

@SpringBootApplication
public static class SinkFromConsumer {

	@Bean
	public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
		return flux -> flux.map(val -> val.toUpperCase());
	}
}

在选择响应式或命令式编程模型时,必须了解一些重要事项。

完全响应式还是仅使用 API?

使用响应式 API 并不一定意味着您可以从该 API 的所有响应式特性中受益。换句话说,像背压和其他高级特性只有在与兼容系统(例如响应式 Kafka Binder)一起工作时才能发挥作用。如果您使用的是常规的 Kafka、Rabbit 或任何其他非响应式 Binder,您只能受益于响应式 API 本身的便利性,而无法利用其高级特性,因为流的实际源或目标不是响应式的。

错误处理和重试

在本手册中,您将看到多个关于基于框架的错误处理、重试和其他特性以及与其相关的配置属性的参考。重要的是要理解它们只影响命令式函数,并且对于响应式函数,您不应抱有相同的期望。原因如下……响应式和命令式函数之间存在根本区别。命令式函数是一个 消息处理器,框架在接收到每条消息时都会调用它。因此,对于 N 条消息,该函数将被调用 N 次,正因为如此,我们可以包装该函数并添加错误处理、重试等附加功能。响应式函数是一个 初始化函数。它只被调用一次,以获取用户提供的 Flux/Mono 的引用,并将其与框架提供的流连接起来。此后,我们(框架)对该流完全没有可见性或控制权。因此,对于响应式函数,在错误处理和重试方面(例如 doOnError().onError*() 等),您必须依赖于响应式 API 的丰富性。

函数式组合

使用函数式编程模型,您还可以受益于函数式组合,在该组合中,您可以从一组简单函数中动态组合复杂的处理器。例如,让我们将以下函数 Bean 添加到上面定义的应用中

@Bean
public Function<String, String> wrapInQuotes() {
	return s -> "\"" + s + "\"";
}

并修改 spring.cloud.function.definition 属性以反映您将 ‘toUpperCase’ 和 ‘wrapInQuotes’ 组合成一个新函数的意图。为此,Spring Cloud Function 依赖于 | (管道) 符号。因此,为了完成我们的示例,我们的属性现在看起来像这样

--spring.cloud.function.definition=toUpperCase|wrapInQuotes
Spring Cloud Function 提供的函数式组合支持的最大好处之一是,您可以组合 响应式命令式 函数。

组合的结果是一个单一函数,正如您可能猜到的,它可能有一个非常长且相当隐晦的名称(例如,foo|bar|baz|xyz. . .),这在涉及其他配置属性时会带来很大不便。这里,功能绑定名称部分中描述的描述性绑定名称特性可以提供帮助。

例如,如果我们想给我们的 toUpperCase|wrapInQuotes 一个更具描述性的名称,我们可以使用以下属性来实现:spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=quotedUpperCaseInput,这样其他配置属性就可以引用该绑定名称(例如,spring.cloud.stream.bindings.quotedUpperCaseInput.destination=myDestination)。

函数组合和横切关注点

函数组合通过将复杂性分解为一组简单且可单独管理/测试的组件来有效解决复杂性,这些组件在运行时仍然可以表示为一个整体。但这并非唯一的好处。

您还可以使用组合来解决某些横切的非功能性关注点,例如内容丰富。例如,假设您有一个入站消息,它可能缺少某些头信息,或者某些头信息不处于您的业务函数所期望的精确状态。现在,您可以实现一个单独的函数来解决这些关注点,然后将其与主业务函数组合起来。

我们来看一个示例

@SpringBootApplication
public class DemoStreamApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoStreamApplication.class,
				"--spring.cloud.function.definition=enrich|echo",
				"--spring.cloud.stream.function.bindings.enrich|echo-in-0=input",
				"--spring.cloud.stream.bindings.input.destination=myDestination",
				"--spring.cloud.stream.bindings.input.group=myGroup");

	}

	@Bean
	public Function<Message<String>, Message<String>> enrich() {
		return message -> {
			Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
			return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
		};
	}

	@Bean
	public Function<Message<String>, Message<String>> echo() {
		return message -> {
			Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
			System.out.println("Incoming message " + message);
			return message;
		};
	}
}

尽管微不足道,这个例子展示了一个函数如何用额外的头信息(非功能性关注点)丰富入站消息,这样另一个函数 - echo - 就可以从中受益。echo 函数保持简洁,只关注业务逻辑。您还可以看到 spring.cloud.stream.function.bindings 属性的用法,它简化了组合后的绑定名称。

具有多个输入和输出参数的函数

从版本 3.0 开始,Spring Cloud Stream 支持具有多个输入和/或多个输出(返回值)的函数。这实际上意味着什么?它针对的是哪种类型的用例?

  • 大数据:想象您正在处理的数据源高度无组织,包含各种类型的数据元素(例如,订单、交易等),您需要有效地对其进行整理。

  • 数据聚合:另一种用例可能需要您合并来自 2 个或更多入站_流_的数据元素。.

上述只是描述了一些您可能需要使用单个函数来接收和/或产生多个数据流的用例。这也是我们在这里针对的用例类型。

另外,请注意这里对概念的强调略有不同。假设此类函数只有在获得实际数据流(而不是单个元素)的访问权限时才具有价值。因此,我们依赖于 Project Reactor 提供的抽象(即 FluxMono),它们作为 Spring Cloud Functions 引入的依赖项的一部分,已在 classpath 中可用。

另一个重要方面是表示多个输入和输出。虽然 Java 提供了多种不同的抽象来表示多个事物,但这些抽象是 a) 无界的b) 缺乏元数的c) 缺乏类型信息的,这些在此上下文中都很重要。例如,让我们看看 Collection 或数组,它们只能描述单一类型的多个或将所有内容向上转型为 Object,这会影响 Spring Cloud Stream 的透明类型转换功能等等。

因此,为了满足所有这些要求,最初的支持依赖于利用 Project Reactor 提供的另一种抽象——Tuples 的函数签名。但是,我们正在努力允许更灵活的函数签名。

请参阅 绑定和绑定名称 部分,了解此类应用程序用于建立绑定名称的命名约定。

让我们看几个示例

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<Tuple2<Flux<String>, Flux<Integer>>, Flux<String>> gather() {
		return tuple -> {
			Flux<String> stringStream = tuple.getT1();
			Flux<String> intStream = tuple.getT2().map(i -> String.valueOf(i));
			return Flux.merge(stringStream, intStream);
		};
	}
}

上述示例演示了一个函数,它接受两个输入(第一个是 String 类型,第二个是 Integer 类型)并产生一个 String 类型的单一输出。

因此,对于上述示例,两个输入绑定将是 gather-in-0gather-in-1,为了保持一致性,输出绑定也遵循相同的约定,并命名为 gather-out-0

了解这些将使您能够设置特定于绑定的属性。例如,以下内容将覆盖 gather-in-0 绑定的内容类型:

--spring.cloud.stream.bindings.gather-in-0.content-type=text/plain
@SpringBootApplication
public class SampleApplication {

	@Bean
	public static Function<Flux<Integer>, Tuple2<Flux<String>, Flux<String>>> scatter() {
		return flux -> {
			Flux<Integer> connectedFlux = flux.publish().autoConnect(2);
			UnicastProcessor even = UnicastProcessor.create();
			UnicastProcessor odd = UnicastProcessor.create();
			Flux<Integer> evenFlux = connectedFlux.filter(number -> number % 2 == 0).doOnNext(number -> even.onNext("EVEN: " + number));
			Flux<Integer> oddFlux = connectedFlux.filter(number -> number % 2 != 0).doOnNext(number -> odd.onNext("ODD: " + number));

			return Tuples.of(Flux.from(even).doOnSubscribe(x -> evenFlux.subscribe()), Flux.from(odd).doOnSubscribe(x -> oddFlux.subscribe()));
		};
	}
}

上述示例与前一个示例有些相反,演示了一个函数,它接受一个 Integer 类型的单一输入并产生两个输出(都是 String 类型)。

因此,对于上述示例,输入绑定是 scatter-in-0,输出绑定是 scatter-out-0scatter-out-1

您可以使用以下代码进行测试

@Test
public void testSingleInputMultiOutput() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					SampleApplication.class))
							.run("--spring.cloud.function.definition=scatter")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		for (int i = 0; i < 10; i++) {
			inputDestination.send(MessageBuilder.withPayload(String.valueOf(i).getBytes()).build());
		}

		int counter = 0;
		for (int i = 0; i < 5; i++) {
			Message<byte[]> even = outputDestination.receive(0, 0);
			assertThat(even.getPayload()).isEqualTo(("EVEN: " + String.valueOf(counter++)).getBytes());
			Message<byte[]> odd = outputDestination.receive(0, 1);
			assertThat(odd.getPayload()).isEqualTo(("ODD: " + String.valueOf(counter++)).getBytes());
		}
	}
}

单个应用程序中的多个函数

在单个应用程序中可能还需要对多个消息处理程序进行分组。您可以通过定义多个函数来实现这一点。

@SpringBootApplication
public class SampleApplication {

	@Bean
	public Function<String, String> uppercase() {
		return value -> value.toUpperCase();
	}

	@Bean
	public Function<String, String> reverse() {
		return value -> new StringBuilder(value).reverse().toString();
	}
}

在上述示例中,我们有一个定义了两个函数 uppercasereverse 的配置。所以首先,如前所述,我们需要注意到存在冲突(函数多于一个),因此我们需要通过提供指向我们希望绑定的实际函数的 spring.cloud.function.definition 属性来解决它。但在这里,我们将使用 ; 分隔符来指向这两个函数(参见下面的测试用例)。

与具有多个输入/输出的函数一样,请参阅 [绑定和绑定名称] 部分,了解此类应用程序用于建立绑定名称的命名约定。

您可以使用以下代码进行测试

@Test
public void testMultipleFunctions() {
	try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
			TestChannelBinderConfiguration.getCompleteConfiguration(
					ReactiveFunctionConfiguration.class))
							.run("--spring.cloud.function.definition=uppercase;reverse")) {

		InputDestination inputDestination = context.getBean(InputDestination.class);
		OutputDestination outputDestination = context.getBean(OutputDestination.class);

		Message<byte[]> inputMessage = MessageBuilder.withPayload("Hello".getBytes()).build();
		inputDestination.send(inputMessage, "uppercase-in-0");
		inputDestination.send(inputMessage, "reverse-in-0");

		Message<byte[]> outputMessage = outputDestination.receive(0, "uppercase-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("HELLO".getBytes());

		outputMessage = outputDestination.receive(0, "reverse-out-0");
		assertThat(outputMessage.getPayload()).isEqualTo("olleH".getBytes());
	}
}

批处理消费者

当使用支持批处理监听器且已为消费者绑定启用此特性的 MessageChannelBinder 时,您可以将 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 设置为 true,以使整个消息批次作为 List 传递给函数。

@Bean
public Function<List<Person>, Person> findFirstPerson() {
    return persons -> persons.get(0);
}

批处理类型转换

与单消息消费者的类型转换类似,批处理要求批次中的每条消息都转换为请求的类型。例如,在前面的示例中,此类型是 Person

同样重要的是要理解,批次中每条消息的头信息在表示整个批次的消息的 MessageHeaders 中是单独提供的。这些消息及其对应的批处理头信息由各自的绑定器创建,其结构可能有所不同。因此,您应该查阅绑定器文档以了解批处理头信息的结构。对于 Kafka 和 Rabbit,您可以分别搜索 amqp_batchedHeaderskafka_batchConvertedHeaders

简而言之,如果您有一条消息表示包含 5 个有效载荷的批次,则同一条消息将包含一组头信息,其中每个头信息与具有相同索引的有效载荷对应。

然而,如果某个特定的有效载荷转换失败会发生什么?在单消息场景中,我们只需返回 null,并使用未转换的消息调用您的方法,这会根据您的函数签名导致异常或允许您处理原始消息。

在批处理的情况下,事情会稍微复杂一些。对未转换的有效载荷返回 null 会有效地减少批次大小。例如,如果原始批次包含 5 条消息,其中 2 条未能转换,则转换后的批次将仅包含 3 条消息。这可能是可以接受的,但对应的批处理头信息怎么办?仍然会有 5 个头信息,因为它们是在绑定器形成初始批次时创建的。这种差异使得将头信息与其对应的有效载荷关联起来变得困难。

为了解决这个问题,我们提供了 MessageConverterHelper 接口。

public interface MessageConverterHelper {

	/**
	 * This method will be called by the framework in cases when a message failed to convert.
	 * It allows you to signal to the framework if such failure should be considered fatal or not.
	 *
	 * @param message failed message
	 * @return true if conversion failure must be considered fatal.
	 */
	default boolean shouldFailIfCantConvert(Message<?> message) {
		return false;
	}

	/**
	 * This method will be called by the framework in cases when a single message within batch of messages failed to convert.
	 * It provides a place for providing post-processing logic before message converter returns.
	 *
	 * @param message failed message.
	 * @param index index of failed message within the batch
	 */
	default void postProcessBatchMessageOnFailure(Message<?> message, int index) {
	}
}

如果实现,框架的消息转换器逻辑将调用此接口,以便在特定有效载荷无法转换时对批处理消息执行后处理。

Kafka 和 Rabbit 的默认实现会自动移除相应的批处理头信息,以维护批处理有效载荷与其头信息之间的关联性。然而,如果您需要为此类情况添加自定义行为,可以提供自己的实现并将其注册为一个 bean。

此外,该接口提供了一个方法,允许更确定性地处理转换失败。默认情况下,此方法返回 false,但如果您更喜欢在发生转换错误时使整个过程失败,可以自定义实现。

批处理生产者

您还可以在生产者端使用批处理的概念,通过返回一个消息集合,这实际上提供了一种反向效果,即集合中的每条消息都将由绑定器单独发送。

考虑以下函数

@Bean
public Function<String, List<Message<String>>> batch() {
	return p -> {
		List<Message<String>> list = new ArrayList<>();
		list.add(MessageBuilder.withPayload(p + ":1").build());
		list.add(MessageBuilder.withPayload(p + ":2").build());
		list.add(MessageBuilder.withPayload(p + ":3").build());
		list.add(MessageBuilder.withPayload(p + ":4").build());
		return list;
	};
}

返回列表中的每条消息都将单独发送,总共发送四条消息到输出目的地。

Spring Integration 流作为函数

当您实现一个函数时,您可能有一些复杂的、符合 企业集成模式(Enterprise Integration Patterns) (EIP) 类别的需求。使用 Spring Integration (SI) 这样的框架最适合处理这些需求,SI 是 EIP 的参考实现。

值得庆幸的是,SI 已经通过 将集成流作为网关 提供了将集成流暴露为函数的功能。考虑以下示例:

@SpringBootApplication
public class FunctionSampleSpringIntegrationApplication {

	public static void main(String[] args) {
		SpringApplication.run(FunctionSampleSpringIntegrationApplication.class, args);
	}

	@Bean
	public IntegrationFlow uppercaseFlow() {
		return IntegrationFlow.from(MessageFunction.class, spec -> spec.beanName("uppercase"))
				.<String, String>transform(String::toUpperCase)
				.log(LoggingHandler.Level.WARN)
				.bridge()
				.get();
	}

	public interface MessageFunction extends Function<Message<String>, Message<String>> {

	}
}

对于熟悉 SI 的人来说,您可以看到我们定义了一个 IntegrationFlow 类型的 bean,其中我们声明了一个希望暴露为 Function<String, String>(使用 SI DSL)的集成流,名为 uppercaseMessageFunction 接口允许我们显式声明输入和输出的类型,以便进行适当的类型转换。有关类型转换的更多信息,请参阅 [内容类型协商] 部分。

要接收原始输入,您可以使用 from(Function.class, …​)

结果函数绑定到目标绑定器暴露的输入和输出目的地。

请参阅 [绑定和绑定名称] 部分,了解此类应用程序用于建立绑定名称的命名约定。

关于 Spring Integration 和 Spring Cloud Stream 在函数式编程模型方面的互操作性的更多详细信息,您可能会发现这篇文章非常有趣,因为它更深入地探讨了通过结合 Spring Integration 和 Spring Cloud Stream/Functions 的优点可以应用的各种模式。

使用轮询消费者

概述

使用轮询消费者时,您按需轮询 PollableMessageSource。为了为轮询消费者定义绑定,您需要提供 spring.cloud.stream.pollable-source 属性。

考虑以下轮询消费者绑定的示例

--spring.cloud.stream.pollable-source=myDestination

前面示例中的轮询源名称 myDestination 将导致绑定名称 myDestination-in-0,以与函数式编程模型保持一致。

鉴于前面示例中的轮询消费者,您可以如下使用它

@Bean
public ApplicationRunner poller(PollableMessageSource destIn, MessageChannel destOut) {
    return args -> {
        while (someCondition()) {
            try {
                if (!destIn.poll(m -> {
                    String newPayload = ((String) m.getPayload()).toUpperCase();
                    destOut.send(new GenericMessage<>(newPayload));
                })) {
                    Thread.sleep(1000);
                }
            }
            catch (Exception e) {
                // handle failure
            }
        }
    };
}

一种更少手动、更 Spring 式的替代方法是配置一个调度任务 bean。例如,

@Scheduled(fixedDelay = 5_000)
public void poll() {
	System.out.println("Polling...");
	this.source.poll(m -> {
		System.out.println(m.getPayload());

	}, new ParameterizedTypeReference<Foo>() { });
}

PollableMessageSource.poll() 方法接受一个 MessageHandler 参数(通常是一个 lambda 表达式,如此处所示)。如果消息被接收并成功处理,它返回 true

与消息驱动的消费者一样,如果 MessageHandler 抛出异常,消息将发布到错误通道,如 错误处理 中所讨论。

通常,当 MessageHandler 退出时,poll() 方法会确认消息。如果方法异常退出,消息将被拒绝(不重新入队),但请参阅 错误处理。您可以通过自己负责确认来覆盖此行为,如下例所示:

@Bean
public ApplicationRunner poller(PollableMessageSource dest1In, MessageChannel dest2Out) {
    return args -> {
        while (someCondition()) {
            if (!dest1In.poll(m -> {
                StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).noAutoAck();
                // e.g. hand off to another thread which can perform the ack
                // or acknowledge(Status.REQUEUE)

            })) {
                Thread.sleep(1000);
            }
        }
    };
}
您必须在某个时候 ack(或 nack)消息,以避免资源泄露。
一些消息系统(例如 Apache Kafka)在日志中维护一个简单的偏移量。如果一次交付失败并使用 StaticMessageHeaderAccessor.getAcknowledgmentCallback(m).acknowledge(Status.REQUEUE); 重新入队,任何后续成功确认的消息都将被重新投递。

还有一个重载的 poll 方法,其定义如下:

poll(MessageHandler handler, ParameterizedTypeReference<?> type)

type 是一个转换提示,允许对入站消息的有效载荷进行转换,如下例所示:

boolean result = pollableSource.poll(received -> {
			Map<String, Foo> payload = (Map<String, Foo>) received.getPayload();
            ...

		}, new ParameterizedTypeReference<Map<String, Foo>>() {});

错误处理

默认情况下,为轮询源配置了一个错误通道;如果回调抛出异常,ErrorMessage 将发送到错误通道(<destination>.<group>.errors);此错误通道也桥接到全局 Spring Integration errorChannel

您可以使用 @ServiceActivator 订阅任一错误通道来处理错误;如果没有订阅,错误将仅被记录,消息将被确认为成功。如果错误通道的服务激活器抛出异常,消息将被拒绝(默认情况下)且不会重新投递。如果服务激活器抛出 RequeueCurrentMessageException,消息将在代理处重新入队,并在后续轮询时再次检索。

如果监听器直接抛出 RequeueCurrentMessageException,消息将如上所述重新入队,而不会发送到错误通道。

© . This site is unofficial and not affiliated with VMware.