【问题标题】:How to wait for an observable to complete?如何等待 observable 完成?
【发布时间】:2020-05-15 15:52:26
【问题描述】:

在 Node.js 中,您可以设置服务器,只要服务器在事件循环中处于活动状态并且处于活动状态,进程就不会终止。我想知道是否可以使用响应式扩展来做类似的事情?我知道如何设置命名管道并利用它与另一个进程中的节点服务器进行通信,但我不确定如何防止程序通过等待按键以外的任何方式终止。我想将该服务器实现为反应式管道的一部分。是否有可能阻塞主线程直到管道完成?

【问题讨论】:

  • 我为 C# 和 F# 编写了一个小型库,它通过标准输入/输出将另一个进程的生命周期抽象为可观察的。它对于管理/组合服务器进程很有用。见github.com/deviousasti/stdio-rx

标签: .net f# reactive-programming system.reactive


【解决方案1】:
open System
open System.Threading
open FSharp.Control.Reactive

module Observable = 
    /// Blocks the thread until the observable completes. 
    let waitUnit on_next (o : IObservable<_>) =
        use t = new ManualResetEventSlim()
        use __ = Observable.subscribeWithCompletion on_next (fun _ -> t.Set()) o
        t.Wait()

Observable.interval (TimeSpan.FromSeconds(1.0))
|> Observable.take 3
|> Observable.waitUnit (printfn "%i...")

printfn "Done."

以上使用F# Rx bindings,但C#版本会类似。库本身有Observable.wait,它的类型是IObservable&lt;'a&gt; -&gt; 'a,但它的缺点是在空序列上抛出异常并将最新值保留在内存中,直到可观察对象被处置。如果最终值不重要,则该语义具有正确的语义。

我已经深入了解了,wait 使用ManualResetEventSlim 来阻止它所在的线程,所以这应该没问题。

【讨论】:

    【解决方案2】:

    很抱歉,我无法在 F# 中给出答案,但在 C# 中这很简单。

    如果我有这个例子:

    Observable
        .Range(0, 10)
        .Subscribe(x => Console.WriteLine(x));
    

    ...我想等到它完成,我可以这样重写它:

        Observable
            .Range(0, 10)
            .Do(x => Console.WriteLine(x))
            .ToArray()
            .Wait();
    

    ...或:

        await Observable
            .Range(0, 10)
            .Do(x => Console.WriteLine(x))
            .ToArray();
    

    正如 Marko 在 cmets 中指出的那样,这种方法创建的数组可能非常大。为了解决这个问题,您可以将.ToArray() 替换为.LastAsync()(在Async 的一般用途之前命名以表示Task - 在这种情况下,它只是将可观察的最后一个元素返回为IObservable&lt;T&gt;) .

    .Wait() 确实会抛出一个空的 observable,但 await 版本不会。

    【讨论】:

    • 您和@Asti 的答案都有一个问题,即他们在取出最后一个值之前将所有值排入队列。您的回答特别有在空序列上引发异常的额外问题。我认为转换为数组在这里除了浪费内存之外没有任何作用。我心情不错,不会对你的回答投反对票,但我希望我没有对阿斯蒂的回答投反对票。
    • @MarkoGrdinic - 感谢您的反馈。希望我现在在回答中解决了这两个问题。
    • .LastAsync() 也会在空序列上抛出异常。
    • @MarkoGrdinic - 是的,但我没有说没有。 await 版本适用于 .ToArray()
    • @MarkoGrdinic 既然这是你的诚心,我已经删除了我的答案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-10-06
    • 1970-01-01
    • 2018-08-06
    • 1970-01-01
    • 2017-09-20
    • 2018-09-05
    • 2021-11-16
    相关资源
    最近更新 更多