【问题标题】:How to chain observables in rx.js如何在 rxjs 中链接 observables
【发布时间】:2016-02-02 16:13:14
【问题描述】:

我有一个 observable 从服务器拉取事件,过滤应用程序类型的事件,然后订阅并将事件分派给一个或多个处理程序来处理。

然后,处理程序开始对数据库进行一些异步更新,我发现 observable 会快速生成事件,以至于更新会相互影响。我应该预料到的。

所以我认为我需要我的处理程序每​​个都使用自己的 observable 充当队列,该队列将处理一个事件并等待确认。

所以我的问题是,如何创建一个连续接收消息并一次发送一条消息以等待确认,然后再发布下一条消息的 observable?

我认为 observables 也需要很冷,因为我不能丢失消息。

【问题讨论】:

    标签: javascript node.js observable rxjs frp


    【解决方案1】:

    我认为运营商concatMap 所做的事情与您正在寻找的内容很接近。您可以在 SO 上查看以前的答案,以说明 concatMap 的类似用例: RxJS queueing dependent tasks

    它很接近但不完全是您想要的,因为没有等待ACK 信号来释放下一个值。相反,concatMap 使用当前“执行”的 observable 的完成信号来订阅下一个。如果您的 observable 在某处包含在数据库上执行更新,那么这些更新将按顺序执行。例如:

    function handler (source$) {
      // source$ is your source of events from which you generate the update calls
      return source$.concatMap(function (event){
        return updateDB(event);
      })
    }
    
    function updateDB(event) {
      return Rx.Observable.create(function(observer){
        // do the update in the db
        // you probably have a success and error handler 
        // you plug the observer notification into those handlers
        if (success) {
          // if you need to pass down some value from the update
          observer.onNext(someValue);
          // In any case, signal completion to allow concatMap to move to next update
          observer.onCompleted();
        }
        if (error) {observer.onError(error);}
      })
    }
    

    这是专门针对您正在使用的库的通用代码。您可能可以直接使用运算符fromNodeCallbackfromCallback,具体取决于您的数据库更新函数的API。

    同样,请注意,在执行当前 observable 时,可能会涉及一些缓冲来保持下一个 observable,并且该缓冲区只能是有限的,所以如果您在生产者之间的速度确实存在显着差异和消费者或内存限制,您可能希望以不同的方式处理事情。

    另外,如果你使用的是 RxJS v5,onError 变为 erroronComplete 变为 completeonNext 变为 next(参见new observer interface)。

    最后评论,您的流的有损/无损性质是一个不同于流的冷热性质的概念。您可以查看illustrated subscription and data flows 了解这两种类型的流。

    【讨论】:

    • 好东西,谢谢。反正很近了。因此,如果有一个 observable,我可以通过调用 onNext(或 next 视情况而定)来从队列中获取下一个项目来调节流程。但是,我认为可观察对象之间的连接仍然是一个问题。我有一个处理消息的模块,然后将每个消息发送到 2 或 3 个其他模块。生产者 observable 不应等待或被任何一个处理程序模块限制,但内部处理程序模块应按顺序执行。
    • 哇,这取决于 v5?看起来我正在使用 v2.5.3。这还不是问题,但我确信它会是,所以我会升级。
    • v5 处于测试阶段,所以我建议升级到 v4,除非你知道自己在做什么
    • 好吧,试试看它是否有效。我相信它应该。生产者 observable(无论是哪个)都没有被限制或等待。缓冲发生在concatMap 级别。但无论如何,我们可能一直在抽象地讨论,最好是尝试,如果它不起作用,你将不得不更具体地了解你的架构(也许发布一些代码)。
    • 酷就行。必须等到今晚,因为这是个人项目,但我会告诉你的。谢谢,
    猜你喜欢
    • 2016-09-07
    • 2020-08-07
    • 2019-03-17
    • 1970-01-01
    • 2016-08-18
    • 2018-05-11
    • 2017-01-06
    • 2021-09-13
    • 1970-01-01
    相关资源
    最近更新 更多