【问题标题】:Remove an IObservable from merged sequence从合并序列中删除一个 IObservable
【发布时间】:2012-11-23 12:29:30
【问题描述】:

我正在使用 Observable.Merge 组合多个序列并在 UI 的单个视图中显示。用户可以选择在 UI 中添加或删除序列(提要)。虽然我使用 Merge 来合并提要。我不确定如何将 IObservable 从合并序列中分离出来。目前,我正在创建一个全新的 IObservable,省略了我想要的提要。是否可以动态添加和删除 ViewModel 已经订阅的 IObsevable?

【问题讨论】:

    标签: wpf c#-4.0 system.reactive


    【解决方案1】:

    看看使用IObservable<IObservable<T>>,然后使用Merge。这会自动允许您通过结束内部IObservable<T> 来删除序列。很简单。

    【讨论】:

      【解决方案2】:

      类似的方法也可以,但我怀疑还有更简洁的方法。

      class Merger<T>
      {
          Subject<T> _merged = new Subject<T>();
      
          public IObservable<T> Merged { get { return _merged; } }
      
          public IDisposable Add(IObservable<T> newStream)
          {
              return newStream.Subscribe(_merged);
          }
      }
      

      要从合并的流中删除某些内容,请处置 IDisposable。

      【讨论】:

        【解决方案3】:

        这可能有助于您开始,但在重新连接时,因为 b 是冷的,b 将从头重新启动:

        var a = Observable.Generate('A', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(200)).Select(x => "a: " + x).Publish();
        var b = Observable.Generate('a', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(500)).Select(x => "b: " + x).Publish();
        
        var merged = a.Merge(b).Publish();
        var submerged = merged.Subscribe(x => x.Dump());
        
        var subA = a.Connect();
        var subB = b.Connect();
        merged.Connect();
        
        Task.Delay(2000).ContinueWith(t => subB.Dump("Disposing b.").Dispose());
        Task.Delay(4000).ContinueWith(t => b.Connect()).ContinueWith(_ => "Reconnected to b");
        

        编辑:

        向合并的 IO 添加另一个“c”:

        var a = Observable.Generate('A', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(200)).Select(x => "a: " + x).Publish();
        var b = Observable.Generate('a', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(500)).Select(x => "b: " + x).Publish();
        var c = Observable.Generate('1', x => x <= Char.MaxValue, x => ++x, x => x, x => TimeSpan.FromMilliseconds(100)).Select(x => "c: " + x).Publish();
        
        var merged = a.Merge(b).Merge(c).Publish();
        var submerged = merged.Subscribe(x => x.Dump());
        
        var subA = a.Connect();
        var subB = b.Connect();
        merged.Connect();
        
        Task.Delay(2000).ContinueWith(t => subB.Dump("Disposing b.").Dispose());
        Task.Delay(4000).ContinueWith(t => b.Connect()).ContinueWith(_ => "Reconnected to b".Dump());
        Task.Delay(6000).ContinueWith(t => c.Connect()).ContinueWith(_ => "Connecting to c".Dump());
        

        【讨论】:

        • 假设我必须在命令中添加 c 。我是否必须再次合并所有内容并在 a、b、c 上调用 connect 并合并?
        • 不可以,可以合并,然后随时连接c;我已经更新了代码来演示它。 HTH。 :)
        • 快到了。但我不知道 c 的参数,它最多可以运行 100 个不同的提要。所以不能有一个合并的前期。必须根据请求合并。
        猜你喜欢
        • 1970-01-01
        • 2017-04-11
        • 1970-01-01
        • 1970-01-01
        • 2021-09-08
        • 2018-04-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多