使用应用程序事件

为了尽可能地使应用程序模块彼此解耦,它们之间主要的交互方式应该是事件发布和消费。这避免了源模块了解所有潜在的感兴趣方,这是启用应用程序模块集成测试的关键方面(参见 集成测试应用程序模块)。

通常我们会发现应用程序组件是这样定义的

  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final InventoryManagement inventory;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    // Invoke related functionality
    inventory.updateStockFor(order);
  }
}
@Service
class OrderManagement(val inventory: InventoryManagement) {

  @Transactional
  fun complete(order: Order) {
    inventory.updateStockFor(order)
  }
}

complete(…) 方法在某种程度上产生了功能上的重力,因为它吸引了相关功能,从而与其他应用程序模块中定义的 Spring bean 进行交互。这使得组件更难测试,因为我们需要有那些依赖的 bean 的实例才能创建 OrderManagement 的实例(参见 处理传出依赖项)。这也意味着,每当我们想要将更多功能与业务事件订单完成集成时,我们都必须修改该类。

我们可以按如下方式更改应用程序模块交互

通过 Spring 的 ApplicationEventPublisher 发布应用程序事件
  • Java

  • Kotlin

@Service
@RequiredArgsConstructor
public class OrderManagement {

  private final ApplicationEventPublisher events;
  private final OrderInternal dependency;

  @Transactional
  public void complete(Order order) {

    // State transition on the order aggregate go here

    events.publishEvent(new OrderCompleted(order.getId()));
  }
}
@Service
class OrderManagement(val events: ApplicationEventPublisher, val dependency: OrderInternal) {

  @Transactional
  fun complete(order: Order) {
    events.publishEvent(OrderCompleted(order.id))
  }
}

请注意,我们没有依赖于其他应用程序模块的 Spring bean,而是使用 Spring 的 ApplicationEventPublisher 在完成主聚合的状态转换后发布域事件。有关更以聚合为中心的事件发布方法,请参阅 Spring Data 的应用程序事件发布机制 以了解详细信息。由于事件发布默认情况下是同步发生的,因此整个安排的事务语义与上面的示例相同。这既有好处,因为我们获得了非常简单的一致性模型(订单状态更改和库存更新要么都成功,要么都失败),但也存在弊端,因为更多触发的相关功能会扩大事务边界,并可能导致整个事务失败,即使导致错误的功能并不关键。

另一种方法是在事务提交时将事件消费移至异步处理,并将辅助功能视为辅助功能

异步、事务性事件监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

这现在有效地将原始事务与监听器的执行解耦。虽然这避免了原始业务事务的扩展,但也存在风险:如果监听器因任何原因失败,则事件发布将丢失,除非每个监听器实际上都实现了自己的安全网。更糟糕的是,这甚至不能完全奏效,因为系统可能在方法甚至被调用之前就失败了。

应用程序模块监听器

要在事务中运行事务性事件监听器,它需要反过来用 @Transactional 进行注释。

在事务中运行的异步、事务性事件监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @Async
  @Transactional(propagation = Propagation.REQUIRES_NEW)
  @TransactionalEventListener
  fun on(event: OrderCompleted) { /* … */ }
}

为了简化通过事件集成模块的默认方式的声明,Spring Modulith 提供了 @ApplicationModuleListener 作为快捷方式。

应用程序模块监听器
  • Java

  • Kotlin

@Component
class InventoryManagement {

  @ApplicationModuleListener
  void on(OrderCompleted event) { /* … */ }
}
@Component
class InventoryManagement {

  @ApplicationModuleListener
  fun on(event: OrderCompleted) { /* … */ }
}

事件发布注册表

Spring Modulith 附带一个事件发布注册表,它连接到 Spring 框架的核心事件发布机制。在事件发布时,它会找出将接收事件的事务性事件监听器,并将每个监听器的条目(深蓝色)写入事件发布日志,作为原始业务事务的一部分。

event publication registry start
图 1. 执行前的事务性事件监听器安排

每个事务性事件监听器都包装在一个方面中,如果监听器的执行成功,该方面会将该日志条目标记为已完成。如果监听器失败,则日志条目保持不变,以便根据应用程序的需要部署重试机制。默认情况下,所有未完成的事件发布将在应用程序启动时重新提交。

event publication registry end
图 2. 执行后的事务事件监听器安排

Spring Boot 事件注册启动器

使用事务事件发布日志需要将一些工件添加到您的应用程序中。为了简化此任务,Spring Modulith 提供了以 持久化技术 为中心的启动器 POM,并默认使用基于 Jackson 的 EventSerializer 实现。以下启动器可用

持久化技术 工件 描述

JPA

spring-modulith-starter-jpa

使用 JPA 作为持久化技术。

JDBC

spring-modulith-starter-jdbc

使用 JDBC 作为持久化技术。在基于 JPA 的应用程序中也能正常工作,但会绕过您的 JPA 提供程序进行实际的事件持久化。

MongoDB

spring-modulith-starter-mongodb

使用 JDBC 作为持久化技术。还支持 MongoDB 事务,并要求服务器设置副本集才能进行交互。可以通过将 spring.modulith.events.mongobd.transaction-management.enabled 属性设置为 false 来禁用事务自动配置。

Neo4j

spring-modulith-starter-neo4j

使用 Spring Data Neo4j 背后的 Neo4j。

管理事件发布

在应用程序运行时,可能需要以各种方式管理事件发布。不完整的发布可能需要在给定时间后重新提交给相应的监听器。另一方面,已完成的发布可能需要从数据库中清除或移动到归档存储中。由于这种清理工作的需求在不同的应用程序之间差异很大,因此 Spring Modulith 提供了一个 API 来处理这两种类型的发布。该 API 可通过 spring-modulith-events-api 工件获得,您可以将其添加到您的应用程序中

使用 Spring Modulith 事件 API 工件
  • Maven

  • Gradle

<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-events-api</artifactId>
  <version>1.2.0</version>
</dependency>
dependencies {
  implementation 'org.springframework.modulith:spring-modulith-events-api:1.2.0'
}

此工件包含两个主要抽象,它们作为 Spring Bean 可供应用程序代码使用

  • CompletedEventPublications — 此接口允许访问所有已完成的事件发布,并提供一个 API 来立即从数据库中清除所有已完成的事件发布,或清除比给定持续时间(例如,1 分钟)更旧的已完成的事件发布。

  • IncompleteEventPublications — 此接口允许访问所有未完成的事件发布,以便重新提交与给定谓词匹配的事件发布或比原始发布日期早于给定 Duration 的事件发布。

事件发布存储库

为了实际写入事件发布日志,Spring Modulith 公开了 EventPublicationRepository SPI 和针对支持事务的流行持久化技术的实现,例如 JPA、JDBC 和 MongoDB。您可以通过将相应的 JAR 添加到您的 Spring Modulith 应用程序中来选择要使用的持久化技术。我们已经准备了专门的 启动器 来简化此任务。

基于 JDBC 的实现可以在设置相应的配置属性(spring.modulith.events.jdbc.schema-initialization.enabled)为 true 时为事件发布日志创建一个专用表。有关详细信息,请参阅附录中的模式概述

事件序列化器

每个日志条目都包含以序列化形式存在的原始事件。spring-modulith-events-core 中包含的 EventSerializer 抽象允许插入不同的策略,以将事件实例转换为适合数据存储的格式。Spring Modulith 通过 spring-modulith-events-jackson 工件提供基于 Jackson 的 JSON 实现,该工件默认情况下通过标准 Spring Boot 自动配置注册一个使用 ObjectMapperJacksonEventSerializer

自定义事件发布日期

默认情况下,事件发布注册表将使用 Clock.systemUTC() 返回的日期作为事件发布日期。如果要自定义此日期,请在应用程序上下文中注册一个类型为时钟的 bean

@Configuration
class MyConfiguration {

  @Bean
  Clock myCustomClock() {
    return … // Your custom Clock instance created here.
  }
}

外部化事件

应用程序模块之间交换的一些事件可能对外部系统很有趣。Spring Modulith 允许将选定的事件发布到各种消息代理。要使用该支持,您需要执行以下步骤

  1. 特定于代理的 Spring Modulith 工件 添加到您的项目中。

  2. 通过使用 Spring Modulith 或 jMolecules 的 @Externalized 注释来选择要外部化的事件类型。

  3. 在注释的值中指定特定于代理的路由目标。

要了解如何使用其他方法选择要外部化的事件,或自定义它们在代理中的路由,请查看事件外部化的基本原理

支持的基础设施

代理 工件 描述

Kafka

spring-modulith-events-kafka

使用 Spring Kafka 与代理进行交互。逻辑路由键将用作 Kafka 的主题和消息键。

AMQP

spring-modulith-events-amqp

使用 Spring AMQP 与任何兼容的代理进行交互。例如,需要为 Spring Rabbit 显式声明依赖项。逻辑路由键将用作 AMQP 路由键。

JMS

spring-modulith-events-jms

使用 Spring 的核心 JMS 支持。不支持路由键。

SQS

spring-modulith-events-aws-sqs

使用 Spring Cloud AWS SQS 支持。逻辑路由键将用作 SQS 消息组 ID。当设置路由键时,需要将 SQS 队列配置为 FIFO 队列。

SNS

spring-modulith-events-aws-sns

使用 Spring Cloud AWS SNS 支持。逻辑路由键将用作 SNS 消息组 ID。当设置路由键时,需要将 SNS 配置为具有内容基于去重的 FIFO 主题。

事件外部化的基本原理

事件外部化在每个应用程序发布的事件上执行三个步骤。

  1. 确定事件是否应该被外部化 — 我们将其称为“事件选择”。默认情况下,只有位于 Spring Boot 自动配置包中并使用支持的 @Externalized 注解之一进行注释的事件类型才会被选中用于外部化。

  2. 映射事件(可选) — 默认情况下,事件使用应用程序中存在的 Jackson ObjectMapper 序列化为 JSON 并按原样发布。映射步骤允许开发人员自定义表示形式,甚至完全用适合外部方的表示形式替换原始事件。请注意,映射步骤先于要发布对象的实际序列化。

  3. 确定路由目标 — 消息代理客户端需要一个逻辑目标来发布消息。目标通常标识物理基础设施(主题、交换机或队列,具体取决于代理),并且通常从事件类型静态派生。除非在 @Externalized 注解中明确定义,否则 Spring Modulith 使用应用程序本地类型名称作为目标。换句话说,在具有 com.acme.app 基本包的 Spring Boot 应用程序中,com.acme.app.sample.SampleEvent 事件类型将发布到 sample.SampleEvent

    一些代理还允许定义一个相当动态的路由键,该路由键用于实际目标内的不同目的。默认情况下,不使用路由键。

基于注解的事件外部化配置

要通过 @Externalized 注解定义自定义路由键,可以使用 $target::$key 模式作为每个特定注解中可用的目标/值属性。键可以是 SpEL 表达式,该表达式将获取配置为根对象的事件实例。

通过 SpEL 表达式定义动态路由键
  • Java

  • Kotlin

@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {

  String getLastname() { (1)
    // …
  }
}
@Externalized("customer-created::#{#this.getLastname()}") (2)
class CustomerCreated {
  fun getLastname(): String { (1)
    // …
  }
}

CustomerCreated 事件通过访问器方法公开客户的姓氏。然后,该方法通过 #this.getLastname() 表达式在目标声明的 :: 分隔符后的键表达式中使用。

如果密钥计算变得更加复杂,建议将其委托给一个以事件作为参数的 Spring bean。

调用 Spring bean 来计算路由键
  • Java

  • Kotlin

@Externalized("…::#{@beanName.someMethod(#this)}")
@Externalized("…::#{@beanName.someMethod(#this)}")

事件外部化配置的编程方式

spring-modulith-events-api 工件包含 EventExternalizationConfiguration,允许开发人员自定义上述所有步骤。

以编程方式配置事件外部化
  • Java

  • Kotlin

@Configuration
class ExternalizationConfiguration {

  @Bean
  EventExternalizationConfiguration eventExternalizationConfiguration() {

    return EventExternalizationConfiguration.externalizing()                 (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())   (2)
      .mapping(SomeEvent.class, it -> …)                                     (3)
      .routeKey(WithKeyProperty.class, WithKeyProperty::getKey)              (4)
      .build();
  }
}
@Configuration
class ExternalizationConfiguration {

  @Bean
  fun eventExternalizationConfiguration(): EventExternalizationConfiguration {

    EventExternalizationConfiguration.externalizing()                         (1)
      .select(EventExternalizationConfiguration.annotatedAsExternalized())    (2)
      .mapping(SomeEvent::class, it -> …)                                     (3)
      .routeKey(WithKeyProperty::class, WithKeyProperty::getKey)              (4)
      .build()
  }
}
1 我们首先创建一个 EventExternalizationConfiguration 的默认实例。
2 我们通过调用先前调用返回的 Selector 实例上的 select(…) 方法之一来自定义事件选择。此步骤从根本上禁用应用程序基本包过滤器,因为我们现在只查找注释。存在用于通过类型、包、包和注释轻松选择事件的便利方法。此外,还有一种快捷方式可以在一步中定义选择和路由。
3 我们为 SomeEvent 实例定义一个映射步骤。请注意,路由仍将由原始事件实例确定,除非您另外在路由器上调用 ….routeMapped()
4 我们最终通过定义一个方法句柄来提取事件实例的值来确定路由键。或者,可以通过使用先前调用返回的 Router 实例上的通用 route(…) 方法为各个事件生成完整的 RoutingKey

测试发布的事件

以下部分描述了一种仅专注于跟踪 Spring 应用程序事件的测试方法。有关使用 @ApplicationModuleListener 的模块的更全面测试方法,请查看 Scenario API

Spring Modulith 的 @ApplicationModuleTest 使能够将 PublishedEvents 实例注入测试方法,以验证在测试的业务操作过程中是否已发布特定事件集。

应用程序模块排列的基于事件的集成测试
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(PublishedEvents events) {

    // …
    var matchingMapped = events.ofType(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());

    assertThat(matchingMapped).hasSize(1);
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: PublishedEvents events) {

    // …
    var matchingMapped = events.ofType(OrderCompleted::class)
      .matching(OrderCompleted::getOrderId, reference.getId())

    assertThat(matchingMapped).hasSize(1)
  }
}

请注意,PublishedEvents 如何公开一个 API 来选择与特定条件匹配的事件。验证通过一个 AssertJ 断言结束,该断言验证预期元素的数量。如果您正在为这些断言使用 AssertJ,您也可以使用 AssertablePublishedEvents 作为测试方法参数类型,并使用通过该类型提供的流畅断言 API。

使用 AssertablePublishedEvents 验证事件发布
  • Java

  • Kotlin

@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  void someTestMethod(AssertablePublishedEvents events) {

    // …
    assertThat(events)
      .contains(OrderCompleted.class)
      .matching(OrderCompleted::getOrderId, reference.getId());
  }
}
@ApplicationModuleTest
class OrderIntegrationTests {

  @Test
  fun someTestMethod(events: AssertablePublishedEvents) {

    // …
    assertThat(events)
      .contains(OrderCompleted::class)
      .matching(OrderCompleted::getOrderId, reference.getId())
  }
}

注意 assertThat(…) 表达式返回的类型如何允许直接定义对已发布事件的约束。