【问题标题】:Handling transition to state for multiple events处理多个事件的状态转换
【发布时间】:2016-04-18 00:32:28
【问题描述】:

我有一个 MassTransitStateMachine 来编排一个涉及创建多个事件的过程。

所有事件完成后,我希望状态转换到“清理”阶段。

下面是相关的状态声明和过滤函数:

        During(ImportingData,
            When(DataImported)
                // When we get a data imported event, mark this source as done. 
                .Then(MarkImportCompletedForLocation),

            When(DataImported, IsAllDataImported)
                // Once all are done, we can transition to cleaning up...
                .Then(CleanUpSources)
                .TransitionTo(CleaningUp)
        );


    ...snip...


    private static bool IsAllDataImported(EventContext<DataImportSagaState, DataImportMappingCompletedEvent> ctx)
    {
        return ctx.Instance.Locations.Values.All(x => x);
    }

因此,当状态为 ImportingData 时,我希望收到多个 DataImported 事件。每个事件都将其位置标记为已完成,以便 IsAllDataImported 方法可以确定我们是否应该转换到下一个状态。

但是,如果最后两个 DataImported 事件同时到达,则转换到 CleaningUp 阶段的处理程序会触发两次,我最终会尝试执行清理两次。

可以在我自己的代码中解决这个问题,但我期待状态机来管理这个问题。我做错了什么,还是我只需要自己处理争用?

【问题讨论】:

    标签: masstransit automatonymous


    【解决方案1】:

    Chris 提出的解决方案不适用于我的情况,因为我有多个相同类型的事件到达。只有当所有这些事件都到达时,我才需要转换。 CompositeEvent 构造不适用于此用例。

    我对此的解决方案是在 MarkImportCompletedForLocation 方法期间引发一个新的 AllDataImported 事件。此方法现在处理确定是否所有子导入都以线程安全的方式完成。

    所以我的状态机定义是:

                During(ImportingData,
                When(DataImported)
                    // When we get a data imported event, mark the URI in the locations list as done. 
                    .Then(MarkImportCompletedForLocation),
    
                When(AllDataImported)
                    // Once all are done, we can transition to cleaning up...
                    .TransitionTo(CleaningUp)
                    .Then(CleanUpSources)
            );
    

    IsAllDataImported 方法不再需要作为过滤器。

    传奇状态有一个 Locations 属性:

    public Dictionary<Uri, bool> Locations { get; set; }
    

    而MarkImportCompletedForLocation方法定义如下:

        private void MarkImportCompletedForLocation(BehaviorContext<DataImportSagaState, DataImportedEvent> ctx)
        {
            lock (ctx.Instance.Locations)
            {
                ctx.Instance.Locations[ctx.Data.ImportSource] = true;
                if (ctx.Instance.Locations.Values.All(x => x))
                {
                    var allDataImported = new AllDataImportedEvent {CorrelationId = ctx.Instance.CorrelationId};
                    this.CreateEventLift(AllDataImported).Raise(ctx.Instance, allDataImported);
                }
            }
        }
    

    (我刚刚写这个是为了了解一般流程是如何工作的;我认识到 MarkImportCompletedForLocation 方法需要通过验证字典中是否存在键来更具防御性。)

    【讨论】:

    • 有趣的解决方案,我自己正在处理一个类似的问题,我需要跟踪许多相同类型的事件的完成情况。您是如何将 Dictionary 集合存储到 saga 状态的?我正在使用 EntityFramework 扩展来保持传奇状态,但这不支持存储字典。你在使用 NHibernate 扩展库吗?
    • 对不起@simon_d 我刚刚看到你的评论。我正在使用一些技巧,将我的传奇状态作为 JSON 结构保存到我的存储库中。这意味着当我的状态结构发生变化时(例如,当我更新我的应用程序时),我不需要担心数据库模式的变化。因此,字典类型属性序列化/反序列化没有任何问题。
    【解决方案2】:

    您可以使用复合事件将多个事件累积到一个后续事件中,该事件会在相关事件触发时触发。这是使用定义的:

    CompositeEvent(() => AllDataImported, x => x.ImportStatus, DataImported, MoreDataImported);
    
    During(ImportingData,
        When(DataImported)
            .Then(context => { do something with data }),
        When(MoreDataImported)
            .Then(context => { do smoething with more data}),
        When(AllDataImported)
            .Then(context => { okay, have all data now}));
    

    然后,在您的状态机状态实例中:

    class DataImportSagaState :
        SagaStateMachineInstance
    {
        public int ImportStatus { get; set; }
    }
    

    这应该可以解决您要解决的问题,因此请试一试。请注意,事件顺序无关紧要,它们可以按任何顺序到达,因为已接收事件的状态位于实例的 ImportStatus 属性中。

    各个事件的数据没有保存,因此您需要自己使用.Then() 方法将其捕获到状态实例中。

    【讨论】:

    • 您好,谢谢您的回答。我想知道这是否真的能解决我的问题。您的示例似乎适用于不同类型的多个事件,而我有多个相同类型的事件实例。
    • 数据导入在多个处理程序之间拆分,每个处理程序负责要导入的总数据的一个子集。当每个人都完成时,我需要决定是否适合进入下一阶段。因此,每个处理程序将触发相同的事件类型,但具有不同的标识符。
    • 啊,嗯,是的,这就是您需要跟踪自己收到的东西的地方,因为您可能还要处理订购和其他事情。我不认为这是一种固有的工作方式。不过,当您完成后,您可以向自己发布一个不同的事件。
    • 是的 - 在这种情况下,顺序实际上并不重要,但我明白你在说什么。我会将我的解决方案作为单独的答案发布,以帮助有此问题的其他人。
    猜你喜欢
    • 2017-03-12
    • 2016-01-16
    • 2015-05-16
    • 2019-09-07
    • 2012-07-31
    • 1970-01-01
    • 1970-01-01
    • 2013-03-27
    • 1970-01-01
    相关资源
    最近更新 更多