【问题标题】:Is it possible to restart an IConnectableObservable?是否可以重新启动 IConnectableObservable?
【发布时间】:2018-04-06 04:50:04
【问题描述】:

我正试图弄清楚如何在完成或出错后“重新启动”IConnectableObservable

下面的代码显示了一个可连接的 observable 的两个订阅者 (A, B)。一旦他们的订阅被释放,一个新的订阅者 (C) 就会连接。我希望它在 C 看来它是一个全新的可观察对象,但我只是在第一次订阅中得到异常。

static void Main(string[] args)
{
    var o = Observable.Create<string>(observer =>
        {
            observer.OnNext("msg");
            observer.OnError(new Exception("boom"));
            return Disposable.Create(() => {
                Console.WriteLine("Observer has unsubscribed");
            });
        }
    )
    .Publish();

    o.Subscribe(
        x => Console.WriteLine("A: " + x),
        ex => Console.WriteLine("A: " + ex),
        () => Console.WriteLine("A: done"));

    o.Subscribe(
        x => Console.WriteLine("B: " + x),
        ex => Console.WriteLine("B: " + ex),
        () => Console.WriteLine("B: done"));            

    var subscription = o.Connect();

    subscription.Dispose();

    o.Subscribe(
        x => Console.WriteLine("C: " + x),
        ex => Console.WriteLine("C: " + ex),
        () => Console.WriteLine("C: done"));        

    subscription = o.Connect();

}

给出以下结果:

A: msg
B: msg
A: System.Exception: boom
B: System.Exception: boom
Observer has unsubscribed
C: System.Exception: boom
Observer has unsubscribed

而我想:

A: msg
B: msg
A: System.Exception: boom
B: System.Exception: boom
Observer has unsubscribed
C: msg
C: System.Exception: boom
Observer has unsubscribed

有什么想法吗?谢谢!

【问题讨论】:

  • 如果你已经发布了 observable 并且它运行完成,那么它就不能产生更多的值。您需要从Func&lt;IConnectableObservable&lt;string&gt;&gt; 开始以获得您想要的行为。您是否正在尝试解决现实世界的问题?
  • 嘿@Enigmativity,我实际上是在重新审视你几周前帮助我的一个问题(stackoverflow.com/questions/46911672/…) - 结果我更喜欢可观察到的完成或错误,以提供适当的终身管理。显然,在共享 observable 的情况下,这意味着流一旦完成就无法重新启动 - 正如您确实警告过我的那样! stackoverflow.com/questions/46911672/….
  • 我决定稍微重构我的应用程序,以完全消除对共享 observable 的需求——这带来了更多的麻烦,这是值得的。如果您想提供一个答案,说明无法重新启动IConnectableObservable 以及Func&lt;IConnectableObservable&lt;string&gt;&gt; 解决方法的示例,那么我很乐意接受。干杯。

标签: c# system.reactive reactive-programming


【解决方案1】:

虽然它不会“重新启动”可观察对象,但将 Publish 替换为 Replay 会提供您期望的输出。但是,请记住,这将缓冲源 observable 中的所有值。最好限制重放值的数量。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-13
    • 2013-01-08
    • 2012-05-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多