【问题标题】:RXJS : Idiomatic way to create an observable stream from a paged interfaceRXJS:从分页界面创建可观察流的惯用方式
【发布时间】:2015-04-01 02:25:31
【问题描述】:

我有分页界面。给定一个起点,一个请求将产生一个结果列表和一个继续指示符。

我创建了一个 observable,它是通过构造和平面映射读取页面的 observable 来构建的。此 observable 的结果包含页面的数据和要继续使用的值。我提取数据并将其平面映射到订阅者。产生一个值流。

为了处理分页,我为下一页值创建了一个主题。它使用初始值播种,然后每次收到包含有效下一页的响应时,我都会推送到页面主题并触发另一次读取,直到没有更多可读取的时间为止。

有没有更惯用的方法?

    function records(start = 'LATEST', limit = 1000) {
      let pages = new rx.Subject();

      this.connect(start)
        .subscribe(page => pages.onNext(page));

      let records = pages
        .flatMap(page => {
          return this.read(page, limit)
            .doOnNext(result => {
              let next = result.next;
              if (next === undefined) {
                pages.onCompleted();
              } else {
                pages.onNext(next);
              }
            });
        })
        .pluck('data')
        .flatMap(data => data);

      return records;
    }

【问题讨论】:

    标签: reactive-programming rxjs


    【解决方案1】:

    这是一种合理的做法。它有几个潜在的缺陷(可能会或可能不会影响您,具体取决于您的用例):

    1. 您无法观察this.connect(start) 中发生的任何错误
    2. 你的 observable 实际上是 hot。如果调用者没有立即订阅 observable(也许他们存储它并稍后订阅),那么他们将错过 this.connect(start) 的完成,并且 observable 似乎永远不会产生任何东西。李>
    3. 如果呼叫者改变主意并提前取消订阅,您将无法从最初的connect 呼叫中取消订阅。没什么大不了的,但通常当构建一个 observable 时,应该尝试将一次性组件链接在一起,以便在调用者取消订阅时正确清理调用。

    这是修改后的版本:

    1. 它将错误从this.connect 传递给观察者。
    2. 它使用Observable.create 创建一个cold observable,只有在调用者实际订阅时才开始业务,因此不会丢失初始页面值并停止流。
    3. 它结合了this.connect 订阅一次性和整体订阅一次性

    代码:

        function records(start = 'LATEST', limit = 1000) {
            return Rx.Observable.create(observer => {
                let pages = new Rx.Subject();
                let connectSub = new Rx.SingleAssignmentDisposable();
                let resultsSub = new Rx.SingleAssignmentDisposable();
                let sub = new Rx.CompositeDisposable(connectSub, resultsSub);
    
                // Make sure we subscribe to pages before we issue this.connect()
                // just in case this.connect() finishes synchronously (possible if it caches values or something?)
                let results = pages
                    .flatMap(page => this.read(page, limit))
                    .doOnNext(r => this.next !== undefined ? pages.onNext(this.next) : pages.onCompleted())
                    .flatMap(r => r.data);
                resultsSub.setDisposable(results.subscribe(observer));
    
                // now query the first page
                connectSub.setDisposable(this.connect(start)
                    .subscribe(p => pages.onNext(p), e => observer.onError(e)));
    
                return sub;
            });
        }
    

    注意:我以前没有使用过 ES6 语法,所以希望我没有在这里搞砸任何东西。

    【讨论】:

    • 谢谢。那好多了。也谢谢你没有指出pluck flatmap的白痴。
    猜你喜欢
    • 2016-05-17
    • 1970-01-01
    • 1970-01-01
    • 2016-10-31
    • 1970-01-01
    • 1970-01-01
    • 2016-12-07
    • 2020-04-02
    • 1970-01-01
    相关资源
    最近更新 更多