【问题标题】:Handling out of order events in CQRS read side在 CQRS 读取端处理乱序事件
【发布时间】:2017-04-03 20:31:23
【问题描述】:

我读过 Jonathan Oliver 写的这篇关于处理乱序事件的好文章。

http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/

我们使用的解决方案是将消息出列并将其放置在“保留表”中,直到具有先前序列的所有消息都被 已收到。收到所有先前的消息后,我们将全部 消息从保持表中取出,并按顺序通过 适当的处理程序。一旦所有处理程序都已执行 成功,我们从保留表中删除消息并提交 读取模型的更新。

这对我们有用,因为域发布事件并标记它们 具有适当的序列号。没有这个,解决方案 下面会更加困难——如果不是不可能的话。

此解决方案使用关系数据库作为持久性存储 机制,但我们没有使用 存储引擎。同时,所有这一切都有一个警告。 如果消息 2、3 和 4 到达但消息 1 从未到达,我们不申请 任何一位。只有在出现错误时才会发生这种情况 处理消息 1 或消息 1 以某种方式丢失。幸运的是, 纠正我们的消息处理程序中的任何错误很容易,并且 重新运行消息。或者,在丢失消息的情况下,重新构建 直接从事件存储中读取模型。

有几个问题,特别是关于他说我们如何始终向事件存储询问丢失的事件。

  1. CQRS 的写入端是否必须为读取公开服务 “要求”重播事件?例如,如果事件 1 不是 收到了,但是 2、4、3 有,我们可以通过 a 询问 eventstore 从 1 开始重新发布事件的服务?
  2. 此服务是否由 CQRS 的写入方负责?
  3. 我们如何使用它重新构建读取模型?

【问题讨论】:

  • 我们在 RabbitMq 中使用了“重试”方法,效果很好。如果在多次重试后它仍然不起作用 - 您只需将此事件放入死信队列并重置序列号,以便可以正确处理进一步的事件。您的应用程序中出现乱序事件的原因通常是什么?
  • 我有一些特定的命令会生成多个事件。我还没有实现任何东西,但可能会发生各种乱序事件。我的事件发布者也是异步工作的。因此,某些事件也可能不会按顺序发布。我依靠我的事件序列号来帮助我把它重新组合起来。我将尝试重试方法。如果您能详细说明一下,我可以将其标记为答案。
  • 我在答案的 cmets 部分添加了更详细的说明。

标签: cqrs event-sourcing


【解决方案1】:

如果你有一个序列号,那么你可以检测到当前事件乱序的情况,例如currentEventNumber != lastReceivedEventNumber + 1

一旦检测到这一点,您只需抛出一个异常。如果您的订阅者有“重试”机制,它将尝试在大约一秒钟内再次处理此事件。很有可能在这段时间内处理较早的事件并且顺序将是正确的。如果乱序事件很少发生,这是一个解决方案。

如果你经常遇到这种情况,你需要实现全局锁定机制,这将允许某些事件被顺序处理。 例如,我们在 MSSQL 中使用 sp_getapplock 在某些情况下实现全局“临界区”行为。当分布式应用程序的多个部分需要的不仅仅是一个简单的锁时,Apache ZooKeeper 提供了一个框架来处理更复杂的场景。

【讨论】:

  • 我正在研究某些多人游戏如何处理这种情况。游戏有一个内置缓存,持续时间约为 100 毫秒。它会在应用事件之前等待 100 毫秒,以防之前的事件丢失。由于潜在的可伸缩性问题,我对使用任何锁有点犹豫。顺便说一句,你如何在你的实现中向事件存储询问丢失的事件?
  • 如果你想让这个系统健壮,你必须非常小心聚合端的各种缓存。如果您的应用程序突然失败会怎样?根据我的经验,使用缓存事件的方法不会扩展。在我们的业务案例中,我们正在构建分布式 24/7 容错服务器,这意味着您必须在不同的物理机器上至少有 2 个聚合器进程实例。如果你想避免脑裂的情况 - 你必须考虑让 3 个独立的实例并行运行。
  • >>你如何在你的实现中向事件存储请求丢失的事件?好吧,我们使用的是 rabbitmq,后来切换到了 azure service bus。这两种服务都提供了交付保证的功能。您基本上是在聚合器端的事务结束时告知队列服务您的事件已成功处理。
  • 这个想法是在接收消息时使用manual acknowledgement mode。如果您的序列号无效 - 只需抛出异常,这意味着不会发生确认,这意味着将重新传递消息。您可以通过在客户端实现中捕获-休眠-重新抛出未处理的异常来处理延迟。这就是 dotnet 中的acknowledgement happens。截图来自this tutorial page
  • 声明currentEventNumber != lastReceivedEventNumber + 1; 并不完全正确。以 lastReceivedEventNumber = 5 和 currentEventNumber = 8 的情况为例。这是一个完全有效的情况。这是否绝对意味着版本 6 和 7 的事件尚未到来?他们真的应该到达吗?没有。
【解决方案2】:

另一种选择是将您从 (S1) 读取事件的服务提供给服务,使其只能为您的服务 (S2) 生成有序事件。

例如,如果您有许多不同会话的大量事件进入,则在前端有一个排序服务 (O1) 负责排序。它确保每个会话只有一个事件传递到 (S1),并且只有当 (S1) 和 (S2) 都已成功处理时,它才会 (O1) 允许该会话的新事件传递到 (S1)。为了提高性能,也需要进行一些排队。

【讨论】:

  • 考虑一下,前端服务 (O2) 只需将其传递的事件标记为该会话的版本,然后下游服务就拥有完成每个会话/版本对所需的内容。远端可能关心版本的任何东西都可以确保它使用最新的
  • 您所描述的是一个由演员模型设计模式更好地处理的场景。 Service Fabric 和 Akka.net 都有执行此操作的参与者。
  • 达西斯。我最近查看了 ServiceFabric,非常喜欢它的可靠集合(MS 做得很好),是的,我提到的会话可以作为 Actors 实现。虽然我不完全确定是否使用 ServiceFabric,但如果我在 Actor 上调用大量异步方法,那么这些任务的执行顺序始终保持有序。
【解决方案3】:

基于时间戳的解决方案:

传入的消息是:

{
 id: 1,
 timestamp: T2,
 name: Samuel
}
{
 id: 1,
 timestamp: T1,
 name: Sam,
 age: 26
}
{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 contact: 123
}

而我们期望看到的不管数据库中的 ORDER 是:

{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 age: 26,
 contact: 123
}

对于每条传入的消息,请执行以下操作:

  1. 获取持久化记录并评估时间戳。
  2. 哪个时间戳更大就是目标。

现在让我们来看看这些消息:

  1. T2 先到:将其存储在数据库中,因为它是第一个。
  2. T1 下一个到达:持续一 (T2) 和传入 (T1),因此 T2 是目标。
  3. T3 到达:持续一 (T2) 和传入 (T1),因此 T3 是目标。

下面的 deepMerge(src, target) 应该可以给我们结果:

public static JsonObject deepMerge(JsonObject source, JsonObject target) {
    for (String key: source.keySet()) {
        JsonElement srcValue = source.get(key);
        if (!target.has(key)) { // add only when target doesn't have it already
            target.add(key, srcValue);
        } else {
            // handle recursively according to the requirement

        }
    }
    return target;
}

如果您需要完整版的 deepMerge(),请在评论中告诉我

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-11
    • 2019-02-16
    • 2018-12-20
    • 1970-01-01
    相关资源
    最近更新 更多