【问题标题】:Synchronizing multiple observable streams同步多个可观察流
【发布时间】:2012-10-31 21:53:58
【问题描述】:

我对使用响应式扩展很陌生,所以这可能是一个新手问题,但我有以下情况:

我从数据库中获取 3 个 IEnumerable 列表(不同类型)并填充视图模型。但是,我想协调订阅以在所有列表完成加载后触发某些事情。反应式扩展是否可以做到这一点,还是我的想法有误?代码如下:

GetCustomers()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
GetCountries()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);
GetTransports()
    .ToObservable(Scheduler.Default)
    .Buffer(20).ObserveOn(SynchronizationContext.Current)
    .Subscribe(View.Model.AddRange);

【问题讨论】:

  • 每个IEnumerable<T> 是否在不同的T 上工作?
  • 是的,它们都有不同的类型。 View.Model.AddRange 还具有三种不同类型的重载。

标签: .net system.reactive


【解决方案1】:

您可以尝试使用可观察连接。像这样的:

var plan =
    Observable.Start(() => GetCountries())
        .And(Observable.Start(() => GetCustomers()))
        .And(Observable.Start(() => GetTransports()))
        .Then((countries, customers, transports)
            => new { countries, customers, transports });

var query =
    Observable.When(new [] { plan });

query
    .Subscribe(cct =>
    {
        View.Model.AddRange(cct.countries);
        View.Model.AddRange(cct.customers);
        View.Model.AddRange(cct.transports);
    });

它并行运行,最后您将所有结果合二为一。

【讨论】:

  • 我认为 OP 希望列表并行加载,但要知道它们何时全部完成。
  • 是的@Asti,这会并行加载列表,但仅在全部加载时才显示它们。
【解决方案2】:

我不确定为什么要将已经同步的 Enumerable 更改为 Observable,但在 Rx 习惯用法中,您可以:

Observable.Merge(Add(GetCustomers()), Add(GetCountries())..., Add(GetTransports()))
           .Subscribe(() => { }, Completed);

Add 可能在哪里:

    private IObservable<Unit> Add<T>(IObservable<T> o)
    {
        return o.Buffer(20)
                .ObserveOn(SynchronizationContext.Current)
                .Do(View.Model.AddRange)
                .Select(_ => Unit.Default);
    }

【讨论】:

  • 这将如何工作,因为T 对于所有三个 observables 都不同?
  • @casperOne 我不知道他的内部逻辑是什么,但是OP调用View.Model.AddRange对所有3个。如果类型不同,可以通过Get()...Do(action).Select(_ =&gt; new Unit())缩小范围。
  • 根据澄清,这是行不通的,因为 T 对于所有可观察对象都不同(AddRange 有重载)。
  • @casperOne 我确实在上面的评论中提到了另一种解决方案。我添加了编辑答案以反映这一点。
猜你喜欢
  • 1970-01-01
  • 2011-06-17
  • 2018-04-18
  • 1970-01-01
  • 2021-04-13
  • 1970-01-01
  • 2020-04-02
  • 1970-01-01
  • 2019-11-04
相关资源
最近更新 更多