【问题标题】:Axon Sagas duplicates events in event store when replaying events to new DBAxon Sagas 在将事件重播到新数据库时复制事件存储中的事件
【发布时间】:2025-12-12 22:45:01
【问题描述】:

我们有存储新订单的 Axon 应用程序。对于每个订单状态更改 (OrderStateChangedEvent),它会计划几个任务。任务由另一个 Saga 触发和执行(TaskSaga - 超出问题范围)

当我删除投影数据库,但离开事件存储,然后再次运行应用程序时,会重播事件(正确的是),但任务是重复的。

我想这是因为OrderStateChangedEvent 每次都会触发一组新的ScheduleTaskCommand

由于我是 Axon 的新手,不知道如何避免这种重复。

在 AxonServer 上运行的事件存储

Spring boot 应用程序自动配置轴突的东西

投影数据库包含投影表和轴突表: 令牌入口 saga_entry 关联值入口

我想所有事件都被重播了,因为通过重新创建数据库,Axon 表消失了(因此没有关于最后应用事件的记录)

我错过了什么吗?

  • token_entry/saga_entry/association_value_entry 表是否应该成为每个应用程序节点上投影表的数据库的一部分?
  • 我认为事件存储可能会在不更改事件历史记录的情况下随时重播到新应用程序节点的数据库中,因此我可以运行任意数量的节点。或者我可以随时删除投影分贝并运行应用程序,是什么导致事件再次投影到新分贝。或者这不是真的?
  • 一般来说,我的问题是一个事件产生的命令导致新的事件(重复)产生。我应该避免这种“链接”事件以避免重复吗?

谢谢!

轴突配置:

@Configuration
public class AxonConfig {

    @Bean
    public EventSourcingRepository<ApplicationAggregate> applicationEventSourcingRepository(EventStore eventStore) {
        return EventSourcingRepository.builder(ApplicationAggregate.class)
                        .eventStore(eventStore)
                        .build();
    }

    @Bean
    public SagaStore sagaStore(EntityManager entityManager) {
        return JpaSagaStore.builder().entityManagerProvider(new SimpleEntityManagerProvider(entityManager)).build();
    }
}
  1. 订单聚合收到的 CreateOrderCommand(fromCommand 方法只是将 1:1 命令映射到事件)
    @CommandHandler
    public OrderAggregate(CreateOrderCommand cmd) {
        apply(OrderCreatedEvent.fromCommand(cmd))
                .andThenApply(() -> OrderStateChangedEvent.builder()
                        .applicationId(cmd.getOrderId())
                        .newState(OrderState.NEW)
                        .build());
    }
  1. 订单聚合设置属性
    @EventSourcingHandler
    protected void on(OrderCreatedEvent event) {
        id = event.getOrderId();

        // ... additional properties set

    }

    @EventSourcingHandler
    protected void on(OrderStateChangedEvent cmd) {
        this.state = cmd.getNewState();
    }
  1. OrderStateChangedEvent 由 Saga 侦听,为特定状态的顺序安排几个任务
    private Map<String, TaskStatus> tasks = new HashMap<>();

    private OrderState orderState;

    @StartSaga
    @SagaEventHandler(associationProperty = "orderId")
    public void on(OrderStateChangedEvent event) {
        orderState = event.getNewState();
        List<OrderStateAwareTaskDefinition> tasksByState = taskService.getTasksByState(orderState);
        if (tasksByState.isEmpty()) {
            finishSaga(event.getOrderId());
        }

        tasksByState.stream()
                .map(task -> ScheduleTaskCommand.builder()
                        .orderId(event.getOrderId())
                        .taskId(IdentifierFactory.getInstance().generateIdentifier())
                        .targetState(orderState)
                        .taskName(task.getTaskName())
                        .build())
                .peek(command -> tasks.put(command.getTaskId(), SCHEDULED))
                .forEach(command -> commandGateway.send(command));
    }

【问题讨论】:

  • 任何答案是否解决了您的问题@Tomáš Mika?如果您可以分享您的行动方案或如果答案解决了您的问题,将对其他读者有所帮助。

标签: axon


【解决方案1】:

我想我可以在这种情况下帮助你。 因此,发生这种情况是因为 TrackingEventProcessor 使用的 TrackingToken 将所有事件提供给 Saga 实例,它被初始化为事件流的开头。因此,TrackingEventProcessor 将从时间的开头开始,从而使您的所有命令再次分派。

您可以采取一些措施来解决此问题。

  1. 您可以不擦除整个数据库,而只擦除投影表并保持令牌表不变。
  2. 您可以将TrackingEventProcessorinitialTrackingToken 配置为从事件流的头部而不是尾部开始。

选项 1 可以解决问题,但需要从操作角度进行一些授权。选项 2 将其交给开发人员,可能比其他解决方案更安全。

要调整令牌以从头部开始,您可以用TrackingEventProcessorConfiguration 实例化TrackingEventProcessor

    EventProcessingConfigurer configurer;

    TrackingEventProcessorConfiguration trackingProcessorConfig =
            TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
                                               .andInitialTrackingToken(StreamableMessageSource::createHeadToken);

    configurer.registerTrackingEventProcessor("{class-name-of-saga}Processor", 
                                              Configuration::eventStore, 
                                              c -> trackingProcessorConfig);

因此,您将为您的 Saga 创建所需的配置并调用 andInitialTrackingToken() 函数并确保创建不存在令牌的头令牌。

我希望这可以帮助你解决 Tomáš!

【讨论】:

  • 顺便说一句,我刚刚创建了一个问题来讨论这个要切换的默认值,特别是针对 Sagas。我已经多次遇到这个确切的问题,我觉得这值得在框架中进行调整。 github.com/AxonFramework/AxonFramework/issues/1019
【解决方案2】:

Steven 的解决方案很有效,但仅限于 Sagas。对于那些想要达到相同效果但在经典@EventHandler(在重放时跳过执行)的人来说,有一种方法。首先,您必须找出跟踪事件处理器的命名方式——我在 AxonDashboard(运行 AxonServer 的 8024 端口)中找到了它——通常它是带有 @EventHandler 注释的组件的位置(准确地说是包名称)。然后按照 Steven 在他的回答中指出的那样添加配置。

    @Autowired
    public void customConfig(EventProcessingConfigurer configurer) {
        // This prevents from replaying some events in @EventHandler
        var trackingProcessorConfig = TrackingEventProcessorConfiguration
                .forSingleThreadedProcessing()
                .andInitialTrackingToken(StreamableMessageSource::createHeadToken);
        configurer.registerTrackingEventProcessor("com.domain.notreplayable",
                org.axonframework.config.Configuration::eventStore,
                c -> trackingProcessorConfig);
    }

【讨论】: