【问题标题】:.Net RX: tracking progress of parallel execution.Net RX:跟踪并行执行的进度
【发布时间】:2012-01-02 04:20:28
【问题描述】:

我需要并行执行多个长时间运行的操作,并希望以某种方式报告进度。从我最初的研究来看,IObservable 似乎适合这个模型。这个想法是我调用一个返回 int 的 IObservable 的方法,其中 int 报告完成百分比,并行执行在退出方法后立即开始,这个可观察对象必须是热可观察对象,以便所有订阅者在特定时间点学习相同的进度信息,例如迟到的订阅者可能只知道整个执行已完成并且没有更多的进度要跟踪。

我发现最接近此问题的方法是使用 Observable.ForkJoin 和 Observable.Start,但我无法理解如何使它们成为可以从方法返回的单个可观察对象。

请分享您对如何实现的想法,或者使用 .Net RX 可能有另一种解决此问题的方法。

【问题讨论】:

    标签: c# system.reactive


    【解决方案1】:

    为了使一个热可观察,我可能会从一个使用BehaviorSubject 作为返回值的方法和操作报告进度的方式开始。如果您只想要示例,请跳到最后。这个答案的其余部分解释了这些步骤。

    为了这个答案,我假设您的长时间运行的操作没有自己的异步调用方式。如果他们这样做了,下一步可能会有所不同。接下来要做的是使用IScheduler 将工作发送到另一个线程。如果需要,您可以允许调用者通过将调度程序作为参数的重载来选择工作发生的位置(在这种情况下,不会选择默认调度程序的重载)。 IScheduler.Scheduler 的重载有不少,其中有几个是扩展方法,所以你应该仔细查看它们,看看哪个最适合你的情况;我在这里使用只需要Action 的on。如果您有多个可以并行运行的操作,您可以多次调用scheduler.Schedule

    其中最困难的部分可能是确定任何给定点的进度。如果您同时进行多项操作,您可能需要跟踪已完成的操作数量以了解当前进度。根据您提供的信息,我不能再具体了。

    最后,如果您的操作是可取消的,您可能需要将CancellationToken 作为参数。当它在调度程序的队列中时,您可以使用它来取消它开始之前的操作。如果你写的操作码正确,它也可以使用token进行取消。

    IObservable<int> DoStuff(/*args*/, 
                             CancellationToken cancel,
                             IScheduler scheduler)
    {
        BehaviorSubject<int> progress;
        //if you don't take it as a parameter, pick a scheduler
        //IScheduler scheduler = Scheduler.ThreadPool;
    
        var disp = scheduler.Schedule(() =>
        {
            //do stuff that needs to run on another thread
    
            //report progres
            porgress.OnNext(25);
        });
        var disp2 = scheduler.Schedule(...);
    
        //if the operation is cancelled before the scheduler has started it,
        //you need to dispose the return from the Schedule calls
        var allOps = new CompositeDisposable(disp, disp2);
        cancel.Register(allOps.Dispose);
    
        return progress;
    }
    

    【讨论】:

    • 这是一个非常好的示例和解释。谢谢你。我只有一个问题。如果我的长时间运行的操作被分成几个组,并且我需要在并行执行组内的操作的同时连续执行组怎么办?在这种情况下,我是否应该使用允许通过 ContinueWith 而不是调度程序进行此类链接的任务库,但保留对 BehaviorSubject 的使用?
    • @andriys 这是一种选择。另一种选择是将每个组分成一个单独的函数,返回一个 IObservable。然后可以通过Observable.Concat 将这些功能链接在一起。但是,您将希望使链中除第一个可观察对象之外的所有对象都变冷。 Observable.Defer 可让您将热的 observable 转换为冷的。
    • 如果操作不受计算限制,而是受网络限制,您将如何更改样本,例如并行请求不同的网络服务?
    • @andriys 我认为不会有太大变化。您可能会使用不同的调度程序,但我不会打扰。
    【解决方案2】:

    这是一种方法

    // setup a method to do some work, 
    // and report it's own partial progress
    Func<string, IObservable<int>> doPartialWork = 
        (arg) => Observable.Create<int>(obsvr => {
            return Scheduler.TaskPool.Schedule(arg,(sched,state) => {
                var progress = 0;
                var cancel = new BooleanDisposable();
                while(progress < 10 && !cancel.IsDisposed)
                {
                    // do work with arg
                    Thread.Sleep(550);
                    obsvr.OnNext(1); //report progress
                    progress++;
                }
                obsvr.OnCompleted();
                return cancel;
            });
        });
    
    var myArgs = new[]{"Arg1", "Arg2", "Arg3"};
    
    // run all the partial bits of work
    // use SelectMany to get a flat stream of 
    // partial progress notifications
    var xsOfPartialProgress =  
            myArgs.ToObservable(Scheduler.NewThread)
                  .SelectMany(arg => doPartialWork(arg))
                      .Replay().RefCount();
    
    // use Scan to get a running aggreggation of progress
    var xsProgress = xsOfPartialProgress
                       .Scan(0d, (prog,nextPartial)  
                                => prog + (nextPartial/(myArgs.Length*10d)));
    

    【讨论】:

    • 你的函数创建了一个冷的 observable。这项工作不是从对doPartialWork 的初始调用开始,而是从每个订阅返回的可观察对象开始。
    猜你喜欢
    • 1970-01-01
    • 2014-09-18
    • 1970-01-01
    • 2016-12-23
    • 2018-12-12
    • 1970-01-01
    • 2013-05-17
    • 2020-05-17
    • 1970-01-01
    相关资源
    最近更新 更多