【问题标题】:When using MDA, should you differentiate between idempotent and non-idempotent event handlers?使用 MDA 时,是否应该区分幂等和非幂等事件处理程序?
【发布时间】:2016-03-18 22:56:14
【问题描述】:

问题假设使用事件溯源。

通过重放事件来重建当前状态时,事件处理程序应该是幂等的。例如,当用户成功更新其用户名时,可能会发出 UsernameUpdated 事件,该事件包含 newUsername 字符串属性。重建当前状态时,适当的事件处理程序接收UsernameUpdated 事件并将User 对象上的username 属性设置为UsernameUpdated 事件对象的newUsername 属性。换句话说,多次处理同一条消息总是会产生相同的结果。

但是,当与外部服务集成时,这样的事件处理程序是如何工作的?例如,如果用户想要重置他们的密码,User 对象可能会发出一个PasswordResetRequested 事件,该事件由一部分代码处理,该部分代码向第三方发出发送 SMS 的命令。现在,当应用程序重建时,我们不想重新发送此 SMS。如何最好地避免这种情况?

【问题讨论】:

    标签: distributed-computing event-sourcing event-driven-design


    【解决方案1】:

    交互中有两个消息:命令和事件。

    我不认为消息传递基础架构中的系统消息与域事件相同。命令消息处理应该是幂等的。事件处理程序通常不需要。

    在您的场景中,我可以告诉聚合根 100 次来更新用户名:

    public UserNameChanged ChangeUserName(string username, IServiceBus serviceBus)
    {
        if (_username.Equals(username))
        {
            return null;
        }
    
        serviceBus.Send(new SendEMailCommand(*data*));
    
        return On(new UserNameChanged{ Username = userName});
    }
    
    public UserNameChanged On(UserNameChanged @event)
    {
        _username = @event.UserName;
    
        return @event;
    }
    

    上面的代码会产生一个事件,因此重构它不会产生任何重复的处理。即使我们有 100 个UserNameChanged 事件,结果仍然与On 方法不执行任何处理相同。我想要记住的一点是命令端完成所有真正的工作,而事件端用于更改对象的状态。

    以上内容不一定是我实现消息传递的方式,但它确实展示了这个概念。

    【讨论】:

    • 在 CQRS 常见问题解答 (cqrs.nu/Faq/command-handlers) 中,由于并发问题,发送电子邮件等副作用(非幂等行为)应该在事件处理程序中,而不是在命令处理程序中。
    • 我同意这一点。你需要小心。在我的示例中,您会注意到我没有发送电子邮件。我正在发送命令。我希望消息传递基础设施能够确保消息只被处理一次,并且在那个时候是正确的。因此,如果命令处理失败,SendEMailCommand 通常将无处可去,因此不会发送电子邮件。如果我直接使用 SMTP 发送电子邮件,是的,会有问题。
    【解决方案2】:

    我认为您在这里混合了两个不同的概念。第一个是重构一个对象,其中处理程序都是实体本身的内部方法。 Sample code from Axon framework

    public class MyAggregateRoot extends AbstractAnnotatedAggregateRoot {
    
    @AggregateIdentifier
    private String aggregateIdentifier;
    private String someProperty;
    
    public MyAggregateRoot(String id) {
        apply(new MyAggregateCreatedEvent(id));
    }
    
    // constructor needed for reconstruction
    protected MyAggregateRoot() {
    }
    
    @EventSourcingHandler
    private void handleMyAggregateCreatedEvent(MyAggregateCreatedEvent event) {
        // make sure identifier is always initialized properly
        this.aggregateIdentifier = event.getMyAggregateIdentifier();
        // do something with someProperty
    }
    

    }

    当然,您不会将与外部 API 对话的代码放在聚合方法中。

    第二个是在有界上下文上重放事件,这可能会导致您正在谈论的问题,根据您的情况,您可能需要将事件处理程序划分为集群。

    有关这一点,请参阅 Axon 框架文档,以更好地了解问题及其所采用的解决方案。

    Replaying Events on a Cluster

    【讨论】:

      【解决方案3】:

      TLDR;将 SMS 标识符存储在事件本身中。

      事件溯源的核心原则是“幂等性”。事件是幂等的,这意味着多次处理它们将得到与处理一次相同的结果。命令是“非幂等的”,这意味着重新执行命令可能每次执行都有不同的结果

      聚合由 UUID 标识(重复率非常低)这一事实意味着客户端可以生成新创建的聚合的 UUID。流程管理器(又名“Sagas”)通过监听事件来协调多个聚合之间的操作,以便发出命令,因此从这个意义上说,流程管理器也是一个“客户”。因为进程管理器发出命令,所以不能认为是“幂等的”。

      我想出的一个解决方案是将即将创建的 SMS 的 UUID 包含在 PasswordResetRequested 事件中。这允许流程管理器仅在 SMS 尚不存在时才创建它,从而实现幂等性。

      下面的示例代码(C++ 伪代码):


      // The event indicating a password reset was successfully requested.
      class PasswordResetRequested : public Event {
      public:
          PasswordResetRequested(const Uuid& userUuid, const Uuid& smsUuid, const std::string& passwordResetCode);
      
          const Uuid userUuid;
          const Uuid smsUuid;
          const std::string passwordResetCode;
      };
      
      // The user aggregate root.
      class User {
      public:
      
          PasswordResetRequested requestPasswordReset() {
              // Realistically, the password reset functionality would have it's own class 
              // with functionality like checking request timestamps, generationg of the random
              // code, etc.
              Uuid smsUuid = Uuid::random();
              passwordResetCode_ = generateRandomString();
              return PasswordResetRequested(userUuid_, smsUuid, passwordResetCode_);
          }
      
      private:
      
          Uuid userUuid_;
          string passwordResetCode_;
      
      };
      
      // The process manager (aka, "saga") for handling password resets.
      class PasswordResetProcessManager {
      public:
      
          void on(const PasswordResetRequested& event) {
              if (!smsRepository_.hasSms(event.smsUuid)) {
                  smsRepository_.queueSms(event.smsUuid, "Your password reset code is: " + event.passwordResetCode);
              }
          }
      
      };
      

      上述解决方案有几点需要注意:

      首先,虽然 SMS UUID 发生冲突的可能性(非常)低,但它实际上可能会发生,这可能会导致几个问题。

      1. 与外部服务的通信被阻止。例如,如果用户“bob”请求重置密码并生成“1234”的 SMS UUID,那么(可能 2 年后)用户“frank”请求生成相同的 SMS UUID“1234”的密码重置,该过程经理不会排队,因为它认为它已经存在,所以弗兰克永远不会看到它。

      2. 读取模型中的报告不正确。因为存在重复的 UUID,所以当“frank”正在查看系统发送给他的 SMSes 列表时,读取端可能会显示发送给“bob”的 SMS。如果连续快速生成重复的 UUID,“frank”可能会重置“bob”的密码。

      其次,将 SMS UUID 生成移动到事件中意味着您必须让 User 聚合了解 PasswordResetProcessManager功能(但不是 PasswordResetManager 本身),这增加了耦合。但是,这里的耦合是松散的,因为User 不知道如何对 SMS 进行排队,只是应该对 SMS 进行排队。如果User类自己发送短信,你可能会遇到SmsQueued事件被存储而PasswordResetRequested事件不被存储的情况,这意味着用户将收到短信但生成的密码重置密码没有保存在用户身上,所以输入密码不会重置密码。

      第三,如果产生了PasswordResetRequested事件,但在PasswordResetProcessManager可以创建短信之前系统崩溃了,那么短信最终会被发送,但只有当PasswordResetRequested事件被重新播放时(这可能未来很长一段时间)。例如,最终一致性的“最终”部分可能需要很长时间。


      上述方法有效(我可以看到它也应该适用于更复杂的场景,例如此处描述的OrderProcessManagerhttps://msdn.microsoft.com/en-us/library/jj591569.aspx)。不过,我很想听听其他人对这种方法的看法。

      【讨论】:

      • 如果我理解正确,您的解决方案是在流程管理器中处理该事件之前在事件中包含相关 ID(处理事件、发送命令),以便事件处理程序可以检查它是否有曾经处理过它。但这违反了整个发布订阅的事情,因为事件对它的消费者以及他们将做什么了解太多。您是否为这些场景找到了更好的解决方案?
      猜你喜欢
      • 1970-01-01
      • 2015-05-25
      • 1970-01-01
      • 1970-01-01
      • 2012-02-13
      • 2012-03-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多