【问题标题】:How to handle commands sent from saga in axon framework如何在 axon 框架中处理从 saga 发送的命令
【发布时间】:2019-11-03 01:51:59
【问题描述】:

使用 saga,给定一个事件 EventA,saga 开始,它发送一个命令(或多个)。 我们如何确保命令发送成功然后其他微服务中的实际逻辑没有抛出等等。

让我们举一个电子邮件传奇的例子: 当用户注册时,我们创建一个发布 UserRegisteredEvent 的用户聚合,将创建一个 saga,这个 saga 负责确保向用户发送注册电子邮件(电子邮件可能包含验证密钥、欢迎消息等)。

我们应该使用:

  1. commandGateway.sendAndWait 带有 try/catch -> 可以扩展吗?

  2. commandGateway.send 并使用截止日期并使用某种“失败事件”,例如 SendEmailFailedEvent -> 需要为命令关联“令牌”,以便可以将“关联属性”与正确的传奇关联 发送 SendRegistrationEmailCommand

  3. commandGateway.send(...).handle(...) -> 在句柄中我们可以引用 MyEmailSaga 中的 eventGateway/commandGateway 吗? 如果错误我们发送一个事件?或者我们可以从我们拥有的 saga 实例中修改/调用一个方法。如果没有错误,则其他服务已发送诸如“RegistrationEmailSentEvent”之类的事件,因此 saga 将结束。

  4. 使用截止日期,因为我们只使用“发送”,而不处理可能发送失败的命令的最终错误(其他服务关闭等)

  5. 还有别的吗?

  6. 还是所有的组合?

如何处理以下错误? (使用截止日期或 .handle(...) 或其他)

错误可能是:

  • 命令没有处理程序(没有服务启动等)

  • 命令已处理,但在其他服务中引发异常且未发送任何事件(其他服务中未尝试/捕获)

  • 命令已处理,引发并捕获异常,其他服务发布事件以通知其未能发送电子邮件(saga 将接收事件并根据提供的事件类型和数据执行适当的操作 -> 可能电子邮件错误或不存在所以不需要重试)

  • 我错过的其他错误?

@Saga
public class MyEmailSaga {

    @Autowired
    transient CommandGateway commandGateway;


    @Autowired
    transient EventGateway eventGateway;

    @Autowired
    transient SomeService someService;

    String id;
    SomeData state;
    /** count retry times we send email so can apply logic on it */
    int sendRetryCount;

    @StartSaga
    @SagaEventHandler(associationProperty = "id")
    public void on(UserRegisteredEvent event) {
        id = event.getApplicationId();
        //state = event........
        // what are the possibilities here? 
        // Can we use sendAndWait but it does not scale very well, right?
        commandGateway.send(new SendRegistrationEmailCommand(...));
        // Is deadline good since we do not handle the "send" of the command
    }

    // Use a @DeadlineHandler to retry ?

    @DeadlineHandler(deadlineName = "retry_send_registration_email")
    fun on() {
         // resend command and re-schedule a deadline, etc
    }

    @EndSaga
    @SagaEventHandler(associationProperty = "id")
    public void on(RegistrationEmailSentEvent event) {

    }

}

编辑(接受答案后):

主要是两个选项(对不起,下面是kotlin代码):

第一选择

commandGateway.send(SendRegistrationEmailCommand(...))
    .handle({ t, result ->
    if (t != null) {
       // send event (could be caught be the same saga eventually) or send command or both
    }else{
       // send event (could be caught be the same saga eventually) or send command or both
    }
    })
// If not use handle(...) then you can use thenApply as well
    .thenApply { eventGateway.publish(SomeSuccessfulEvent(...)) }
    .thenApply { commandGateway.send(SomeSuccessfulSendOnSuccessCommand) }

第二个选项: 如果 SendRegistrationEmailCommand 失败并且您没有收到有关失败的任何事件(当您不处理发送的命令时),请使用截止日期确保 saga 执行某些操作。

当然可以将截止日期用于其他目的。

当成功接收到 SendRegistrationEmailCommand 时,接收者将发布一个事件,以便通知 saga 并对其采取行动。 可以是 RegistrationEmailSentEvent 或 RegistrationEmailSendFailedEvent。

总结:

似乎最好只在命令发送失败或接收者抛出意外异常时使用handle(),如果是这样,那么发布一个事件让 saga 对其进行操作。 如果成功,接收者应该发布事件,saga 会监听它(并最终注册一个截止日期以防万一); Receiver 也可以发送事件通知错误并且不抛出,saga 也会监听这个事件。

【问题讨论】:

    标签: cqrs saga axon


    【解决方案1】:

    理想情况下,您将使用异步选项来处理错误。这将是commandGateway.send(command)commandGateway.send(command).thenApply()。如果故障与业务逻辑相关,那么在这些故障上发出事件可能是有意义的。一个普通的gateway.send(command) 是有道理的; Saga 可以对作为结果返回的事件做出反应。否则,您将不得不处理命令的结果。

    您需要使用sendAndWait 还是仅使用send().then... 取决于您在失败时需要执行的活动。不幸的是,在异步处理结果时,您不能再安全地修改 Saga 的状态。 Axon 可能已经保留了 saga 的状态,导致这些更改丢失。 sendAndWait 解决了这个问题。可扩展性通常不是问题,因为可以并行执行不同的 Sagas,具体取决于您的处理器配置。

    Axon 团队目前正在研究可能的 API,这些 API 允许在 Sagas 中安全地异步执行逻辑,同时仍然保证线程安全和状态持久性。

    【讨论】:

    • 所以主要有 2 个选项:第一个选项 -> 使用 thenApply()handle() 发送事件(可能被之前启动命令的同一个 saga 捕获)或发送命令或两个事件/命令。第二个选项 -> 使用 handle() 以防命令没有处理程序(服务已关闭或其他),并在这种情况下发送事件或其他;处理命令时,请考虑接收应该毫无问题地发布适当的事件,以便 saga 收到通知并对其采取行动,或者如果它在发送命令一段时间后没有收到任何事件,则会触发截止日期
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多