https://www.toutiao.com/article/7508175569196974644/

在本节中,我们将深入探讨 Spring Modulith 如何通过事件机制实现模块间的解耦通信,并重点关注如何确保事件发布的可靠性以及如何管理事件的整个生命周期,包括将事件外部化到消息中间件。

模块间事件驱动通信
为了使应用程序模块之间尽可能相互解耦,Spring Modulith 鼓励使用事件发布和消费的方式进行通信。

OrderService发布事件

将 OrderService 修改为不再直接调用 NotificationFacadeService,而是通过 Spring 的 ApplicationEventPublisher 发布一个事件:


// OrderService.java
@RequiredArgsConstructor
@Service
@Slf4j
public class OrderService {

    private final ApplicationEventPublisher eventPublisher;

    @Transactional
    public void createOrder(Order order) {
        log.info("Order Module: Attempting to create order.");
        // 1. 持久化订单
        order.setNo(System.currentTimeMillis() + "");
        log.info("Order Module: Order created successfully, order number: {}", order.getNo());
        // 2. 发布 OrderCreated 事件
        eventPublisher.publishEvent(new OrderCreated(order.getNo(), order.getItems().toString()));
        log.info("Order Module: OrderCreated event published for order number: {}", order.getNo()); // 补充日志,明确事件已发布
    }
}

OrderCreated事件定义


// OrderCreated.java
@AllArgsConstructor
@NoArgsConstructor
@Data
public class OrderCreated {
    private String orderNo;
    private String orderItems;
}

NotificationFacadeService监听事件

NotificationFacadeService 现在将监听 OrderCreated 事件,而不是被直接调用。为了确保事件监听器与发布事务的隔离性(或称一致性,但隔离更强调其独立事务),我们使用 @TransactionalEventListener,并指定其传播行为为 REQUIRES_NEW,这意味着监听器会在一个新的事务中执行。

// NotificationFacadeService.java
@Slf4j
@Service
public class NotificationFacadeService {

    @Async // 添加异步注解,提升性能
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @TransactionalEventListener // 确保事件在发布事务提交后才被处理
    public void on(OrderCreated orderCreated) {
        log.info("Notification Module (Facade): Received order created event: {}", orderCreated);
        // 模拟发送通知
        log.info("Notification Module (Facade): Sending notification for order number: {}", orderCreated.getOrderNo()); // 统一日志格式,避免 "send Order created message" 这种口语化表达
    }
}

测试事件


@Test
public void test_order_created(){
  Order.Item item = new Order.Item("apple", 1, 500);
  Order order = new Order();
  order.setItems(List.of(item));
 orderService.createOrder(order);
}

测试结果

ApplicationModuleListener
为了简化通过事件集成模块的声明方式,Spring Modulith 提供了 @ApplicationModuleListener 作为快捷方式。它是一个复合注解,封装了常用的事件监听行为(如异步执行、新事务、事务性监听)。


// @ApplicationModuleListener 定义示例 (Spring Modulith 内部,无需在项目中创建)
@Async
@Transactional(propagation = Propagation.REQUIRES_NEW)
@TransactionalEventListener
@Documented
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
public @interface ApplicationModuleListener {

    @AliasFor(annotation = Transactional.class, attribute = "readOnly")
    boolean readOnlyTransaction() default false;

    @AliasFor(annotation = EventListener.class, attribute = "id")
    String id() default "";

    @AliasFor(annotation = EventListener.class, attribute = "condition")
    String condition() default "";

    @AliasFor(annotation = Transactional.class, attribute = "propagation")
    Propagation propagation() default Propagation.REQUIRES_NEW;
}

通过使用 @ApplicationModuleListener,NotificationFacadeService 中的监听方法可以进一步简化:

// NotificationFacadeService.java (使用 @ApplicationModuleListener 简化后)
@Slf4j
@Service
public class NotificationFacadeService {

@ApplicationModuleListener // 简化后的注解,默认异步、新事务、事务性监听
public void on(OrderCreated orderCreated) {
    log.info("Notification Module (Facade): Received order created event: {}", orderCreated);
    log.info("Notification Module (Facade): Sending notification for order number: {}", orderCreated.getOrderNo());
}

}
事件发布注册表与可靠性
在基于事件驱动的架构中,当一个模块发布一个事件(例如,订单模块发布“订单已创建”事件)后,这个事件需要被其他感兴趣的监听器模块可靠地接收并处理。然而,在实际应用中,事件的发布和处理过程可能面临系统故障等问题,导致可靠性挑战。

Spring Modulith 提供了强大的机制来管理这些事件的生命周期,确保事件的可靠发布和处理,并支持对事件记录进行维护。

事件发布注册表

Spring Modulith 内置了一个事件发布注册表 (Event Publication Registry),它与 Spring Framework 的核心事件发布机制深度集成。当事件发布时,注册表会识别将要接收事件的事务性事件监听器,并在原始业务事务中为每个监听器写入一个条目到事件发布日志中。

The transactional event listener arrangement before execution

业务操作和事件记录在同一个事务中提交,保证了要么都成功,要么都失败。即使系统在事件成功写入日志后、发送到监听器前崩溃,重启后也能从日志中恢复并重试发送。

每个事务性事件监听器都会被一个切面(Aspect)包裹:如果监听器执行成功,该切面会将对应的日志条目标记为已完成;如果监听器执行失败,日志条目则保持不变,这样应用程序可以根据需要实现重试机制,确保事件最终被处理。

The transactional event listener arrangement after execution

事件发布持久化

为了实际写入事件发布日志并利用事件发布注册表,Spring Modulith 提供了 EventPublicationRepository SPI,并为常见的支持事务的持久化技术(如 JPA、JDBC 和 MongoDB)提供了开箱即用的实现。

为简化集成过程,Spring Modulith 提供了针对不同持久化技术的 Starter POMs。您只需根据项目所使用的数据库技术,添加对应的 Modulith 事件 Starter POM 即可。这些 Starter 通常会默认集成基于 Jackson 的 EventSerializer 实现,用于将事件实例序列化为适合数据存储的格式。

常用的持久化 Starter POMs 如下:

管理事件发布记录

Spring Modulith 在应用运行期间,允许以多种方式管理事件发布记录。未完成的事件发布(即还未被监听器成功处理的事件)可能需要在一段时间后重新提交给对应的监听器;而已完成的事件发布则通常需要从数据库中清理或归档。由于这些“清理”需求因应用而异,Spring Modulith 提供了 API 来分别处理这两类事件发布。

你可以通过添加 spring-modulith-events-api 依赖来获得这些 API:

<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-events-api</artifactId>
  <version>1.3.5</version> </dependency>

该依赖提供了两个主要接口,作为 Spring Bean 可用:

CompletedEventPublications: 用于访问所有已完成的事件发布,并提供 API 立即清除所有已完成的事件,或清除早于指定时间的已完成事件。
IncompleteEventPublications: 用于访问所有未完成的事件发布,可以按条件或按发布时间重新提交这些事件。
这样,你可以灵活地管理事件发布的生命周期,确保事件可靠发布和处理。

事件完成模式 (Event Publication Completion)

当事务性执行或 @ApplicationModuleListener 执行成功完成时,事件发布将被标记为已完成。默认情况下,通过在 EventPublication 上设置完成日期来记录完成状态(UPDATE 模式)。这意味着已完成的发布将保留在事件发布注册表中,以便可以通过上述的 CompletedEventPublications 接口进行检查。这样做的一个后果是,你需要编写一些代码,定期清除旧的、已完成的 EventPublication。否则,它们的持久化存储(例如关系数据库表)将无限制地增长,并可能影响性能。

Spring Modulith 1.3 引入了一个配置属性 spring.modulith.events.completion-mode,以支持两种额外的完成模式。它默认为 UPDATE,采用上述策略。

DELETE 模式: 将 completion-mode 设置为 DELETE 时,注册表的持久化机制会改为在完成时直接删除 EventPublication 记录。这意味着 CompletedEventPublications 将不再返回任何发布,但同时,你也不必再担心手动从持久化存储中清除已完成的事件。
ARCHIVE 模式: 这是一个在 Spring Modulith 1.3 中引入的第三种选项,它会将事件发布记录复制到存档表、集合或节点中。对于该存档条目,将设置完成日期并删除原始条目。与 DELETE 模式不同,已完成的事件发布仍可通过 CompletedEventPublications 抽象进行访问。
事件序列化 (Event Serializer)

每条日志记录都包含原始事件的序列化内容。EventSerializer 抽象(位于 spring-modulith-events-core 模块中)允许你自定义事件对象如何被转换为适合存储的格式。Spring Modulith 默认提供了基于 Jackson 的 JSON 实现(spring-modulith-events-jackson 组件),它会自动注册一个 JacksonEventSerializer,并通过 Spring Boot 的自动配置机制使用 ObjectMapper。

自定义事件发布日期 (Customizing the Event Publication Date)

默认情况下,事件发布注册表会使用 Clock.systemUTC() 返回的日期作为事件的发布时间。如果你想自定义这个时间,只需在应用上下文中注册一个类型为 Clock 的 Bean:

@Configuration
class MyConfiguration {

  @Bean
  Clock myCustomClock() {
    // Your custom Clock instance created here.
    return Clock.fixed(Instant.now(), ZoneId.systemDefault()); // 示例:固定时间
  }
}

示例:事件发布注册表使用
添加依赖

<dependency>
    <groupId>org.springframework.modulith</groupId>
    <artifactId>spring-modulith-starter-jpa</artifactId>
</dependency>

配置application.properties

Hibernate 配置

spring.jpa.hibernate.ddl-auto=update
spring.jpa.show-sql=true

事件完成删除日志

spring.modulith.events.completion-mode=DELETE
远程单元测试

测试监听器运行失败

修改 NotificationFacadeService 的 on 方法,抛出一个 RuntimeException 以模拟失败场景:


@Slf4j
@Service
public class NotificationFacadeService {

    @Async
    @Transactional(propagation = Propagation.REQUIRES_NEW)
    @TransactionalEventListener
    public void on(OrderCreated orderCreated) {
        log.info("Notification Module (Facade): Received order created event: {}", orderCreated);
        // 模拟发送通知
        log.info("Notification Module (Facade): Sending notification for order number: {}", orderCreated.getOrderNo());
        throw new RuntimeException("模拟发送通知失败"); // 模拟异常
    }
}

运行测试

可知,事件监听器消费失败,日志记录未被删除,仍然保留在数据库中。

Externalizing Events (外部化事件) 简述
有些在应用模块间交换的事件,可能对外部系统也很有价值。Spring Modulith 支持将选定的事件发布到多种消息中间件。要启用该功能,请按照以下步骤操作:

添加依赖: 添加与所用消息中间件对应的 Spring Modulith 依赖到你的项目中。
标记事件: 通过为需要外部化的事件类型添加 Spring Modulith 或 jMolecules 的 @Externalized 注解,标记这些事件。
指定目标: 在注解的 value 属性中指定消息中间件的路由目标(例如,Kafka 的 Topic 名称、AMQP 的 Routing Key)。
这样配置后,Spring Modulith 就会自动将这些事件发布到指定的消息中间件,实现与外部系统的集成。

这块内容暂时不做过多介绍,感兴趣的读者可以参考官方文档中的相关章节。

文档更新时间: 2025-05-31 08:19   作者:admin