【问题标题】:Querying the write model for duplicated aggregate root property查询重复聚合根属性的写入模型
【发布时间】:2014-04-22 09:47:14
【问题描述】:

我正在使用事件源实现 CQRS 模式,我正在使用 NServiceBus、NEventStore 和 NES(NSB 和 NEventStore 之间的连接)。

我的应用程序会定期检查 Web 服务是否有任何要下载和处理的文件。当找到一个文件时,一个命令(DownloadFile)被发送到总线,并由 FileCommandHandler 接收,它创建一个新的聚合根(File)并处理消息。

现在在(文件聚合根目录)中,我必须检查文件的内容是否与任何其他文件内容不匹配(因为网络服务保证只有文件名是唯一的,并且内容可能与不同的名称),通过对其进行散列并与散列内容列表进行比较。

问题是我必须在哪里保存哈希码列表?是否允许查询读取模型?

public class File : AggregateBase
{
    public File(DownloadFile cmd, IFileService fileDownloadService, IClaimSerializerService serializerService, IBus bus)
            : this()
        {
        // code to download the file content, deserialize it, and publish an event.
        }
}

public class FileCommandHandler : IHandleMessages<DownloadFile>, IHandleMessages<ExtractFile>
{
        public void Handle(DownloadFile command)
        {
             //for example, is it possible to do this (honestly, I feel it is not, since read model should always considered stale !)
            var file = readModelContext.GetFileByHashCode (Hash(command.FileContent));
            if (file != null)
                throw new Exception ("File content matched with another already downloaded file");

            // Since there is no way to query the event source for file content like:
            // eventSourceRepository.Find<File>(c=>c.HashCode == Hash(command.FileContent));
        }
}

【问题讨论】:

  • 我相信原则是“命令”和“查询”分离(例如可能是不同的数据源)。这并不意味着该命令不能使用查询。我倾向于在命令执行中使用查询。
  • 重复的内容文件(即无法处理的DownloadFile命令)应该怎么办?

标签: c# nservicebus cqrs event-sourcing neventstore


【解决方案1】:

您似乎正在寻找重复数据删除。

您的指挥方面是您希望事情保持一致的地方。查询将始终让您对竞争条件持开放态度。因此,我不会运行查询,而是反转逻辑并将哈希实际写入数据库表(任何具有 ACID 保证的数据库)。如果此写入成功,则处理该文件。如果哈希写入失败,则跳过处理。

没有必要将此逻辑放入处理程序中,因为在失败的情况下重试消息(即多次存储哈希)不会使其成功。您最终还会在错误 q 中收到有关重复文件的消息。

重复数据删除逻辑的好地方可能是在您的 Web 服务客户端中。一些伪逻辑

  1. 获取文件
  2. 打开交易
  3. 将哈希插入数据库并捕获失败(不是任何失败,只是插入失败)
  4. 如果在步骤 3 中插入​​的记录数不为零,则 Bus.Send 消息以处理文件
  5. 提交交易

NServiceBus 网关 here 中的一些示例重复数据删除代码

编辑:
查看他们的代码,我实际上认为session.Get&lt;DeduplicationMessage&gt; 是不必要的。 session.Save(gatewayMessage); 应该足够了,是一致性边界。

只有在失败率很高的情况下进行查询才有意义,这意味着您有很多重复的内容文件。如果 99%+ 的插入成功,则确实可以将重复项视为异常。

【讨论】:

  • 将重复数据删除逻辑移至客户端对我来说很有意义。感谢@chrisbednarski 的建议
【解决方案2】:

这取决于很多事情......吞吐量就是其中之一。但是,由于无论如何您都在以“基于拉”的方式处理此问题(您正在查询 Web 服务以轮询工作(下载和分析文件)),因此您可以使整个过程串行,而不必担心冲突。现在这可能不会给出您想要处理“工作”的理想速率,但更重要的是......您测量了吗?让我们暂时回避一下,假设串行不起作用。我们在谈论多少个文件?几百、一千、……几百万?根据该哈希值可能适合内存,并且如果/当进程停止时可以重建。也可能有机会沿着时间轴或上下文划分您的问题。从黎明开始或今天开始的每个文件,或者本月的文件价值?真的,我认为你应该深入挖掘你的问题空间。除此之外,使用事件溯源来解决这个问题感觉很尴尬,但是 YMMV。

【讨论】:

  • 对不起,我的回复晚了,网络服务是外部网络服务,我无法控制它。问题是不应该存在重复,但是由于来自 Web 服务另一端的人为错误(并且 Web 服务没有提供验证),可能存在重复,但是什么时候?,多长时间?,以及多少?,没有人能回答这些问题。所以我必须保存所有文件的所有哈希码,当下载任何文件时,应该检查它是否重复。另外,我们说的是每月 5000 个文件,我想这并不多。
  • 我应该如何保存哈希,并在进程停止时重建它们?我可以使用任何持久性方法(数据库、系统文件等)吗?我也对这个想法本身感兴趣,不仅仅是这个特殊情况,换句话说,在域模型中拥有这样的业务规则是否可行?谢谢。
【解决方案3】:

当您在域中具有真正的唯一性约束时,您可以将唯一性测试器设为域服务,其实现是基础架构的一部分——类似于存储库,其接口是域的一部分,其实现是基础设施的一部分。对于实现,您可以使用内存中的哈希或根据需要更新/查询的数据库。

【讨论】:

  • 所以在这种情况下,我必须创建一个类来保存每个文件的哈希码,并要求域服务在每次下载文件时将其持久化?再加上 AFAIK,域服务可以执行与任何聚合根无关的操作!在我的情况下,它属于文件聚合,对吗?
  • 在类似的设置中,我使用来自事件流的投影来保存文件哈希列表,就像我使用其他读取模型一样。我不确定您对聚合根/文件聚合的含义,我会将唯一性约束排除在文件聚合之外,但我不知道您的模型。
猜你喜欢
  • 2020-12-17
  • 1970-01-01
  • 2023-04-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-04-10
  • 2023-03-27
  • 1970-01-01
相关资源
最近更新 更多