【问题标题】:Observable collection OnNext not firing可观察的集合 OnNext 未触发
【发布时间】:2016-02-06 20:20:07
【问题描述】:

我有这个非常简单的可观察集合,但 OnNext 没有触发。

List<int> intList = new List<int>(){1,2,3};
IObservable<int> observableList = intList.ToObservable();

IDisposable subscription = observableList.Subscribe(
    x =>  Console.WriteLine("Received {0} from source.", x),        
    ex => Console.WriteLine( "OnError: " + ex.Message ),        
    ( ) => Console.WriteLine( "OnCompleted" )        
    ); 
intList.Add(4);

我得到的输出如下。

从来源收到 1 个。

从来源收到 2 个。

从来源收到 3 个。

已完成

我期待“从源头收到 4”。在我将 4 添加到列表之后。

有人可以说明我在哪里做错了。我是新Rx

【问题讨论】:

标签: system.reactive


【解决方案1】:

这一切都取决于您的操作顺序。

如果你的代码结构是这样的:

List<int> intList = new List<int>() { 1, 2, 3 };

IObservable<int> observableList = intList.ToObservable();

intList.Add(4);

IDisposable subscription =
    observableList
        .Subscribe(
            x => Console.WriteLine("Received {0} from source.", x),
            ex => Console.WriteLine("OnError: " + ex.Message),
            () => Console.WriteLine("OnCompleted"));

...然后它会按您的预期工作。

问题是.Subscribe.ToObservable() 的当前线程上运行。实际运行的代码是return (IObservable&lt;TSource&gt;) new ToObservable&lt;TSource&gt;(source, SchedulerDefaults.Iteration);SchedulerDefaults.Iteration 是当前线程。

您可以通过以下代码看到这一点:

List<int> intList = new List<int>() { 1, 2, 3 };

IObservable<int> observableList = intList.ToObservable();

Console.WriteLine("Before Subscription");

IDisposable subscription =
    observableList
        .Subscribe(
            x => Console.WriteLine("Received {0} from source.", x),
            ex => Console.WriteLine("OnError: " + ex.Message),
            () => Console.WriteLine("OnCompleted"));

Console.WriteLine("After Subscription, Before Add");

intList.Add(4);

Console.WriteLine("After Add");

当我运行它时,我得到:

Before Subscription
Received 1 from source.
Received 2 from source.
Received 3 from source.
OnCompleted
After Subscription, Before Add
After Add

所以.Add 直到订阅完成后才发生。

现在,如果我尝试通过将代码更改为 intList.ToObservable(Scheduler.Default) 来解决这个问题,那么我会遇到一个新问题。运行我上面的代码,我得到了这个:

Before Subscription
After Subscription, Before Add
After Add
Received 1 from source.
OnError: Collection was modified; enumeration operation may not execute.

现在很明显,我们遇到了并发问题。您不应该同时尝试操作和迭代集合。

【讨论】:

  • 谢谢。意味着我需要了解更多。
  • @VivekDev - 你觉得你有什么困难?
  • @VivekDev - 只有一件事。我见过人们认为列表上的.ToObservable() 将使可观察对象响应订阅之后添加到列表中的新项目。你觉得是这样吗?
  • 是的,你是对的。这正是我的想法。
  • @VivekDev - 有两件事可以阻止这种情况。 .ToObservable() 调用是IEnumerable&lt;T&gt; 的扩展方法,该接口上没有任何内容可以让它知道何时添加了新项目。其次,它会产生一个冷的 observable,它会立即产生可枚举的所有值作为.OnNext 调用,然后调用.OnCompleted - 一旦.OnCompleted(或.OnError)发出信号,就不能再有任何其他值了。但是,如果您再次订阅,则在添加另一个值后,您将获得该值。
【解决方案2】:

这仅仅是因为列表上的 .ToObservable() 只会在您每次订阅时为您提供列表中的当前项目,而不会持续通知添加的项目。实现 IEnumerable 的只读集合也是如此。

您可以使用其他集合来代替它们,它们将按预期工作。 例如可观察集合

或者,您可以找到任何提供更改通知的集合类型(collection.Added += 等...)并使用 Observable.FromEvent 连接后续通知。

同样值得理解的是 IEnumerable.ToObservable 是冷可观察的,因此,为什么订阅顺序也很重要(根据第一个答案)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-07-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-08-01
    • 2021-12-21
    • 1970-01-01
    相关资源
    最近更新 更多