【问题标题】:Reactive Extensions ToEnumerable how to dispose observable state if not iterating everythingReactive Extensions ToEnumerable 如果不迭代所有内容,如何处理可观察状态
【发布时间】:2017-05-11 20:40:16
【问题描述】:

如果我在IObservable 上使用ToEnumerable 扩展,则用户可能不会迭代所有元素。在那种情况下,Observable.Create 中声明的IDisposable 该如何正确处理?

为了争论,假设不能直接将IObservable返回给用户(在这种情况下,用户可以自己实现取消)。

 private IObservable<Object> MakeObservable()
 {
   return Observable.Create(async (observer, cancelToken) =>
   {
     using(SomeDisposable somedisposable = new SomeDisposable())
     {
        while(true)
        {
          Object result = somedisposable.GetNextObject();
          if(result == null)
          {
            break;
          }
          observer.OnNext(result);
        }       
     }
   }
 }

 public IEnumerable<Object> GetObjects()
 {
   return MakeObservable().ToEnumerable();
 }

 public void Test()
 {
   IEnumerable<Object> e = GetObjects();
   int i = 0;
   foreach(Object o in e)
   {
     if(i++ == 10)
        break;
   }
   //somedisposable is not disposed here!!!
 }

【问题讨论】:

    标签: c# system.reactive rx.net


    【解决方案1】:

    问题是您对Create 的定义没有返回,因此无法停止。如果您将可观察的源更改为基于计时器的东西,那么它可以正常工作。试试这个代码:

    public IEnumerable<long> GetObjects()
    {
        return Observable
            .Interval(TimeSpan.FromSeconds(1.0))
            .Finally(() => Console.WriteLine("Done."))
            .ToEnumerable();
    }
    
    public void Test()
    {
        foreach (long i in GetObjects())
        {
            Console.WriteLine(i);
            if (i == 10)
            {
                break;
            }
        }
    }
    

    当你运行它时,你会得到这个输出:

    0 1 2 3 4 5 6 7 8 9 10 完毕。

    它显然在源 observable 上调用 OnCompleted

    【讨论】:

    • 谢谢。虽然我不会使用Interval,但这个答案帮助我理解了为什么会这样。我意识到我可以更多地使用cancelToken,它似乎在退出 foreach 时也会被取消。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-20
    • 1970-01-01
    相关资源
    最近更新 更多