这一切都取决于您的操作顺序。
如果你的代码结构是这样的:
List<int> intList = new List<int>() { 1, 2, 3 };
IObservable<int> observableList = intList.ToObservable();
intList.Add(4);
IDisposable subscription =
observableList
.Subscribe(
x => Console.WriteLine("Received {0} from source.", x),
ex => Console.WriteLine("OnError: " + ex.Message),
() => Console.WriteLine("OnCompleted"));
...然后它会按您的预期工作。
问题是.Subscribe 在.ToObservable() 的当前线程上运行。实际运行的代码是return (IObservable<TSource>) new ToObservable<TSource>(source, SchedulerDefaults.Iteration);。 SchedulerDefaults.Iteration 是当前线程。
您可以通过以下代码看到这一点:
List<int> intList = new List<int>() { 1, 2, 3 };
IObservable<int> observableList = intList.ToObservable();
Console.WriteLine("Before Subscription");
IDisposable subscription =
observableList
.Subscribe(
x => Console.WriteLine("Received {0} from source.", x),
ex => Console.WriteLine("OnError: " + ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("After Subscription, Before Add");
intList.Add(4);
Console.WriteLine("After Add");
当我运行它时,我得到:
Before Subscription
Received 1 from source.
Received 2 from source.
Received 3 from source.
OnCompleted
After Subscription, Before Add
After Add
所以.Add 直到订阅完成后才发生。
现在,如果我尝试通过将代码更改为 intList.ToObservable(Scheduler.Default) 来解决这个问题,那么我会遇到一个新问题。运行我上面的代码,我得到了这个:
Before Subscription
After Subscription, Before Add
After Add
Received 1 from source.
OnError: Collection was modified; enumeration operation may not execute.
现在很明显,我们遇到了并发问题。您不应该同时尝试操作和迭代集合。