【问题标题】:Merge and onCompleted - How to get notification when all the parallel operations have completed?Merge 和 onCompleted - 如何在所有并行操作完成后获得通知?
【发布时间】:2012-01-19 05:38:33
【问题描述】:

我正在测试 Reactive Extensions(NuGet 的主分支),但在 Merge 中遇到了一些问题。我正在并行运行多个操作,并希望在所有操作完成后收到通知,但我没有收到。

这是我使用 WebClient 下载网页然后计算字数的操作:

    private IObservable<int> GetWebsiteWordCount(Uri uri)
    {
        var client = new WebClient();

        var o = Observable.FromEventPattern<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted")
                  .ObserveOn(Scheduler.ThreadPool)
                  .Select(newString => newString.EventArgs.Result.Split(' ').Length);

        client.DownloadStringAsync(uri);

        return o;
    }

然后我将创建其中的许多:

        var tasks = new List<IObservable<int>>()
                        {
                            GetWebsiteWordCount(new Uri("http://www.google.com", UriKind.Absolute)),
                            GetWebsiteWordCount(new Uri("http://www.bing.com", UriKind.Absolute)),
                            GetWebsiteWordCount(new Uri("http://www.yle.fi", UriKind.Absolute))
                        };

之后我使用 Merge 将这些合并并尝试在所有这些都完成时收到通知:

        tasks.Merge()
            .ObserveOn(SynchronizationContext.Current)
            .Subscribe(x => Debug.WriteLine(x), ex => Debug.WriteLine("exception thrown"),
                       () => Debug.WriteLine("all ready"));

所有这些“任务”都正确执行,我在调试窗口中得到了预期的字数:

14279
672
292

但我没有收到“一切就绪”的消息。有什么我想念的想法吗?

更新:使用 Sum 而不是 Merge

我也尝试将合并更改为:

        var result = from i in tasks.ToObservable()
                     from r in i
                     select r;

        result.Sum().Subscribe(x => Debug.WriteLine("all ready. sum: " + x));

但我再也没有得到结果。

更新:修复 Take 的问题

感谢 Gideon Engelberth,现在 Merge 和 Sum-options 都可以工作了。解决方案是通过添加 Take(1) 来修复 GetWebsiteWordCount 方法:

    private IObservable<int> GetWebsiteWordCount(Uri uri)
    {
        var client = new WebClient();

        var o = Observable.FromEventPattern<DownloadStringCompletedEventArgs>(client, "DownloadStringCompleted")
                  .ObserveOn(Scheduler.ThreadPool)
                  .Select(newString => newString.EventArgs.Result.Split(' ').Length)
                  .Take(1);

        client.DownloadStringAsync(uri);

        return o;
    }

【问题讨论】:

    标签: c# silverlight system.reactive


    【解决方案1】:

    Observable.FromEventPattern 永远不会完成,因为它无法知道何时不再有事件。因为您知道这个特定事件是事件异步模式,所以它应该只触发一次。要将这一点告诉 observable,请在 GetWebsiteWordCount 中的某处添加 .Take(1)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-06-03
      • 2015-02-13
      • 1970-01-01
      • 2020-10-25
      • 1970-01-01
      • 2011-03-28
      相关资源
      最近更新 更多