【发布时间】:2018-04-06 04:50:04
【问题描述】:
我正试图弄清楚如何在完成或出错后“重新启动”IConnectableObservable。
下面的代码显示了一个可连接的 observable 的两个订阅者 (A, B)。一旦他们的订阅被释放,一个新的订阅者 (C) 就会连接。我希望它在 C 看来它是一个全新的可观察对象,但我只是在第一次订阅中得到异常。
static void Main(string[] args)
{
var o = Observable.Create<string>(observer =>
{
observer.OnNext("msg");
observer.OnError(new Exception("boom"));
return Disposable.Create(() => {
Console.WriteLine("Observer has unsubscribed");
});
}
)
.Publish();
o.Subscribe(
x => Console.WriteLine("A: " + x),
ex => Console.WriteLine("A: " + ex),
() => Console.WriteLine("A: done"));
o.Subscribe(
x => Console.WriteLine("B: " + x),
ex => Console.WriteLine("B: " + ex),
() => Console.WriteLine("B: done"));
var subscription = o.Connect();
subscription.Dispose();
o.Subscribe(
x => Console.WriteLine("C: " + x),
ex => Console.WriteLine("C: " + ex),
() => Console.WriteLine("C: done"));
subscription = o.Connect();
}
给出以下结果:
A: msg
B: msg
A: System.Exception: boom
B: System.Exception: boom
Observer has unsubscribed
C: System.Exception: boom
Observer has unsubscribed
而我想:
A: msg
B: msg
A: System.Exception: boom
B: System.Exception: boom
Observer has unsubscribed
C: msg
C: System.Exception: boom
Observer has unsubscribed
有什么想法吗?谢谢!
【问题讨论】:
-
如果你已经发布了 observable 并且它运行完成,那么它就不能产生更多的值。您需要从
Func<IConnectableObservable<string>>开始以获得您想要的行为。您是否正在尝试解决现实世界的问题? -
嘿@Enigmativity,我实际上是在重新审视你几周前帮助我的一个问题(stackoverflow.com/questions/46911672/…) - 结果我更喜欢可观察到的完成或错误,以提供适当的终身管理。显然,在共享 observable 的情况下,这意味着流一旦完成就无法重新启动 - 正如您确实警告过我的那样! stackoverflow.com/questions/46911672/….
-
我决定稍微重构我的应用程序,以完全消除对共享 observable 的需求——这带来了更多的麻烦,这是值得的。如果您想提供一个答案,说明无法重新启动
IConnectableObservable以及Func<IConnectableObservable<string>>解决方法的示例,那么我很乐意接受。干杯。
标签: c# system.reactive reactive-programming