【问题标题】:Is it possible to rebuild snapshots when state class changes in akka-persistence?当 akka-persistence 中的状态类发生变化时,是否可以重建快照?
【发布时间】:2021-12-11 00:54:48
【问题描述】:

我们将 akka-persistence 2.6.15 中的 EventSourcedBehavior 用于 CQRS/EventSourcing 应用程序,并使用 akka-persistence-jdbc 4.0.0 将事件和快照存储在 PostgreSQL 数据库中。

我们有通过快照序列化的状态类。但有时这些状态类会发生变化,这使得读取快照明显失败。我们通过删除那些更改的快照来管理它:

      delete from snapshot sn 
      where sn.persistence_id::uuid in (select id from some_entity_table);       

但是对于有很多事件的实体,在发送新命令时,需要很长时间才能到达最新的快照,导致超时。

是否可以在应用程序启动时强制重建快照?

【问题讨论】:

  • 根据您的需要,您可以使用EventAdapterSerializerWithStringManifest 或两者的组合。
  • 感谢您的建议。我们两者都使用,但事件(反)序列化正确,我们不想维护不同版本的快照。 SerializerWithStringManifest.fromBinary 可以返回一个状态,也可以失败。我们只想废弃现有的快照并在需要时重建它们。

标签: scala cqrs snapshot event-sourcing akka-persistence


【解决方案1】:

可以说,“真正”的解决方案是使用SerializerWithStringManifest,它可以将以前的快照格式反序列化为当前格式。如果使用反射驱动的序列化,这可能会更加困难。它也不会迁移现有的快照。

您可以做的一个技巧是将显式ForceSnapshot 命令添加到持久参与者的协议中。这在 Classic API 中非常简单,您可以更好地控制快照,所以我不会讨论这个问题。然而,在EventSourcedBehavior Typed API 中,这需要更巧妙地添加SnapshotForced 事件并更改snapshotWhen 函数以返回true(如果事件是SnapshotForced)。

无论是 Classic 还是 Typed,您都可以让您的应用程序利用 currentPersistenceIds Persistence Query 强制每个 persistenceId 写入一个新快照:

val readJournal =
  PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)

val everythingSnapshotted =
  readJournal.currentPersistenceIds()
    .mapAsync(parallelism) { id =>
      // use the ask pattern to send a `ForceSnapshot` command and wait for reply
      ???
    }
    .runWith(Sink.ignore)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-03-26
    • 1970-01-01
    • 2020-10-21
    • 1970-01-01
    • 2021-02-13
    • 1970-01-01
    • 2021-07-09
    相关资源
    最近更新 更多