【发布时间】:2015-10-30 07:49:11
【问题描述】:
所以我写了一个我遇到的问题的草稿。我有一个 IObserverable> 包含我的流,我想使用 switch 从中获取最新项目,但是我遇到的问题可以用下面的代码巧妙地演示:
var obs = Observable.Create<IObservable<int>>(sub =>
{
var item = Observable.Create<int>(innersub =>
{
var count = 0;
return Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe(x => innersub.OnNext(count++));
}).Publish().RefCount();
return Observable.Interval(TimeSpan.FromSeconds(10)).Subscribe(x => sub.OnNext(item));
});
obs.Switch().Subscribe(x => Console.WriteLine(x));
上面的测试用例表明,当与 Publish().RefCount() 结合使用时,该开关首先取消订阅,然后再订阅新项目。
我想要的是一个连续的数字流上升,但是测试显示“项目”在新订阅命中之前首先被处理掉,我失去了这个计数,不得不重新开始。
如果项目相同,并且使用了 refcount,我希望订阅首先发生,所以 refcount 很高兴,然后旧的订阅被处理掉。这种行为是 RX 可以默认展示的,还是需要一些限制才能正确?我相信我可以根据 RX 源代码的精简版本编写一个足够简单的扩展方法,但如果它已经存在或者有更好的方法我想先知道。
编辑:编写的代码是一个简单的示例,以简单的方式演示问题。我实际上拥有的是一个 observable,它定期发布一个新的 observable,它上面有不同的过滤器,但最终归结为在这一切的基础上相同的可观察到的发布/引用计数。 (where 子句发生了变化,或者 select 做了一些不同的事情。真正的用途是几个流的 .Merge() 所以我对我的逻辑和我对问题的结论很有信心)。我很清楚我的示例可以简化。
【问题讨论】:
标签: c# .net system.reactive reactive-programming