【发布时间】:2025-12-12 22:45:01
【问题描述】:
我们有存储新订单的 Axon 应用程序。对于每个订单状态更改 (OrderStateChangedEvent),它会计划几个任务。任务由另一个 Saga 触发和执行(TaskSaga - 超出问题范围)
当我删除投影数据库,但离开事件存储,然后再次运行应用程序时,会重播事件(正确的是),但任务是重复的。
我想这是因为OrderStateChangedEvent 每次都会触发一组新的ScheduleTaskCommand。
由于我是 Axon 的新手,不知道如何避免这种重复。
在 AxonServer 上运行的事件存储
Spring boot 应用程序自动配置轴突的东西
投影数据库包含投影表和轴突表: 令牌入口 saga_entry 关联值入口
我想所有事件都被重播了,因为通过重新创建数据库,Axon 表消失了(因此没有关于最后应用事件的记录)
我错过了什么吗?
- token_entry/saga_entry/association_value_entry 表是否应该成为每个应用程序节点上投影表的数据库的一部分?
- 我认为事件存储可能会在不更改事件历史记录的情况下随时重播到新应用程序节点的数据库中,因此我可以运行任意数量的节点。或者我可以随时删除投影分贝并运行应用程序,是什么导致事件再次投影到新分贝。或者这不是真的?
- 一般来说,我的问题是一个事件产生的命令导致新的事件(重复)产生。我应该避免这种“链接”事件以避免重复吗?
谢谢!
轴突配置:
@Configuration
public class AxonConfig {
@Bean
public EventSourcingRepository<ApplicationAggregate> applicationEventSourcingRepository(EventStore eventStore) {
return EventSourcingRepository.builder(ApplicationAggregate.class)
.eventStore(eventStore)
.build();
}
@Bean
public SagaStore sagaStore(EntityManager entityManager) {
return JpaSagaStore.builder().entityManagerProvider(new SimpleEntityManagerProvider(entityManager)).build();
}
}
- 订单聚合收到的 CreateOrderCommand(fromCommand 方法只是将 1:1 命令映射到事件)
@CommandHandler
public OrderAggregate(CreateOrderCommand cmd) {
apply(OrderCreatedEvent.fromCommand(cmd))
.andThenApply(() -> OrderStateChangedEvent.builder()
.applicationId(cmd.getOrderId())
.newState(OrderState.NEW)
.build());
}
- 订单聚合设置属性
@EventSourcingHandler
protected void on(OrderCreatedEvent event) {
id = event.getOrderId();
// ... additional properties set
}
@EventSourcingHandler
protected void on(OrderStateChangedEvent cmd) {
this.state = cmd.getNewState();
}
- OrderStateChangedEvent 由 Saga 侦听,为特定状态的顺序安排几个任务
private Map<String, TaskStatus> tasks = new HashMap<>();
private OrderState orderState;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void on(OrderStateChangedEvent event) {
orderState = event.getNewState();
List<OrderStateAwareTaskDefinition> tasksByState = taskService.getTasksByState(orderState);
if (tasksByState.isEmpty()) {
finishSaga(event.getOrderId());
}
tasksByState.stream()
.map(task -> ScheduleTaskCommand.builder()
.orderId(event.getOrderId())
.taskId(IdentifierFactory.getInstance().generateIdentifier())
.targetState(orderState)
.taskName(task.getTaskName())
.build())
.peek(command -> tasks.put(command.getTaskId(), SCHEDULED))
.forEach(command -> commandGateway.send(command));
}
【问题讨论】:
-
任何答案是否解决了您的问题@Tomáš Mika?如果您可以分享您的行动方案或如果答案解决了您的问题,将对其他读者有所帮助。
标签: axon