【问题标题】:RX Switch()'s Subscribe and Unsubscribe OrderRX Switch() 的订阅和取消订阅顺序
【发布时间】:2015-10-30 07:49:11
【问题描述】:

所以我写了一个我遇到的问题的草稿。我有一个 IObserverable> 包含我的流,我想使用 switch 从中获取最新项目,但是我遇到的问题可以用下面的代码巧妙地演示:

        var obs = Observable.Create<IObservable<int>>(sub =>
        {
            var item = Observable.Create<int>(innersub =>
            {
                var count = 0;
                return Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe(x => innersub.OnNext(count++));
            }).Publish().RefCount();

            return Observable.Interval(TimeSpan.FromSeconds(10)).Subscribe(x => sub.OnNext(item));
        });

        obs.Switch().Subscribe(x => Console.WriteLine(x));

上面的测试用例表明,当与 Publish().RefCount() 结合使用时,该开关首先取消订阅,然后再订阅新项目。

我想要的是一个连续的数字流上升,但是测试显示“项目”在新订阅命中之前首先被处理掉,我失去了这个计数,不得不重新开始。

如果项目相同,并且使用了 refcount,我希望订阅首先发生,所以 refcount 很高兴,然后旧的订阅被处理掉。这种行为是 RX 可以默认展示的,还是需要一些限制才能正确?我相信我可以根据 RX 源代码的精简版本编写一个足够简单的扩展方法,但如果它已经存在或者有更好的方法我想先知道。

编辑:编写的代码是一个简单的示例,以简单的方式演示问题。我实际上拥有的是一个 observable,它定期发布一个新的 observable,它上面有不同的过滤器,但最终归结为在这一切的基础上相同的可观察到的发布/引用计数。 (where 子句发生了变化,或者 select 做了一些不同的事情。真正的用途是几个流的 .Merge() 所以我对我的逻辑和我对问题的结论很有信心)。我很清楚我的示例可以简化。

【问题讨论】:

    标签: c# .net system.reactive reactive-programming


    【解决方案1】:

    您将不得不查看源代码,因为前一个 observable 在当前 observable 被订阅之前就被处理掉了。这就是.Switch() 的工作原理。

    如果 Rx 新订阅后处置,您的代码的意图似乎等同于简单地执行此操作:

    var obs = Observable.Create<int>(innersub =>
    {
        var count = 0;
        return Observable.Interval(TimeSpan.FromSeconds(2))
            .Subscribe(x => innersub.OnNext(count++));
    });
    
    obs.Subscribe(x => Console.WriteLine(x));
    

    在这个例子中,它归结为:

    var obs = Observable.Interval(TimeSpan.FromSeconds(2));
    
    obs.Subscribe(x => Console.WriteLine(x));
    

    也许您可以让我们知道您的基本要求是什么,我们可以解决这个问题?

    【讨论】:

      【解决方案2】:

      此运算符主要是为冷可观察对象设计的。 Switch 在订阅新的 observable 之前取消订阅前一个 observable。否则将存在竞争条件,即在订阅两者的短暂时间内可能会出现额外事件。

      由于您的底层 observable 很热,您可能会考虑另一种解决方案,您只需“即时”修改过滤器/选择,而不是使用 Switch 来“重新订阅”。比如:

      source
          .Where(t => ApplyCurrentFilter(t))
          .Select(t => ApplyCurrentProjection(t))
          .Subscribe(...);
      
      // do something that changes what `ApplyCurrentFilter` does...
      

      我不知道这是否比您当前的解决方案更好或更差,但它确实避免了取消订阅/重新订阅源数据的需要。

      【讨论】:

        【解决方案3】:

        如前所述,Observable.Create 产生一个冷的 Observable,而 Publish.RefCount 仅在仍有订阅者时使其变热。可以编写自己的 Switch 版本,在处理旧订阅者之前订阅新订阅者。但我会非常警惕比赛条件。总的来说,做这件事感觉有点奇怪,在 Rx 中,这通常表明有另一种方法可以做你想做的事情,而且更简洁。

        在这个例子中,如果你有想要的结果,那么发布许多 Observables 并切换它们是没有意义的,因为你真的只想在整个持续时间内订阅一个 observables。因此,归结为 Enigmativity 所说的内容。

        但是,显然这是一个人为的示例,所以让我们假设有一个更复杂的情况需要这种方法 - 如果您能够详细说明,它可能会有所帮助。从示例中,您似乎只想订阅一次内部可观察对象。基于该要求, RefCount 是不合适的,但我假设您正在使用它,因为您希望在核心有一个共享的 observable,您将其与您希望每次以不同方式执行的其他操作符一起包装。如果是这种情况,您可能会使用这样的方法:

        var obs = Observable.Create<IObservable<int>>(sub =>
        {
           var item = Observable.Create<int>(innersub =>
           {
               var count = 0;
               return Observable.Interval(TimeSpan.FromSeconds(2))
                                .Subscribe(x => innersub.OnNext(count++));
           }).Publish();
        
           bool connected = false;
           var disposables = new CompositeDisposable();
           disposables.Add(Observable.Interval(TimeSpan.FromSeconds(10))
                                     .Subscribe(x =>
                                     {
                                         // push the new stream to the observer first
                                         sub.OnNext(item);
        
                                         if (!connected)
                                         {
                                             connected = true;
                                             disposables.Add(item.Connect());
                                         }
                                     }));
        
           return disposables;
        });
        

        我没有考虑过这种方法的潜在竞争条件等,很大程度上取决于您的实际情况。但是,在原始帖子的基本测试中,这似乎符合您的要求。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2014-06-30
          • 2017-08-11
          相关资源
          最近更新 更多