【问题标题】:Recursion and Observable RxJs递归和可观察的 RxJ
【发布时间】:2020-02-28 16:20:07
【问题描述】:

我在内部和 Observable 流中执行分页。 分页是通过一个游标和一个使用递归的总计数来实现的。

我可以使用以下代码 observer.next(searches); 发出每个页面,顺便说一下,我想只使用 observable 而没有承诺,但我无法使用 RxJs 运算符表达递归。

有什么建议吗?

    const search = id =>
      new Observable(observer => { recursePages(id, observer) })

    const recursePages = (id, observer, processed, searchAfter) => {
      httpService.post(
        "http://service.com/search",
        {
          size: 50,
          ...searchAfter ? { search_after: searchAfter } : null,
          id,
        })
        .toPromise() // httpService.post returns an Observable<AxiosResponse>
        .then(res => {
          const body = res.data;
          const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
          observer.next(searches);
          const totalProcessed = processed + searches.length;
          if (totalProcessed < body.data.total) {
            return recursePages(id, observer, totalProcessed, searches[searches.length - 1].cursor);
          }
          observer.complete();
        })
    }

    // General Observer
    incomingMessages.pipe(
        flatMap(msg => search(JSON.parse(msg.content.toString()))),
        concatAll(),
    ).subscribe(console.log),    


【问题讨论】:

  • 我对 RxJS 不是很熟悉,但是 take 可以用于这个吗? learnrxjs.io/learn-rxjs/operators/filtering/take
  • 这里的想法是一次发出一页吗?还是一次性全部发射?
  • @Ben take 仅用于从源 observable 中获取给定数量的值,例如 ``` of(1,2,3,4,5).pipe(take(3) ).subscribe(console.log) // 输出 1 2 3 ```
  • @bryan60 这个想法是一次发出一个页面,以便一个一个地处理它们
  • 查看扩展运算符。您可以提供一个递归函数,该函数返回一个可观察的 rxjs-dev.firebaseapp.com/api/operators/expand

标签: javascript recursion rxjs observable reactive-programming


【解决方案1】:

这些方法将递归地收集所有页面并将它们发送到一个数组中。然后可以使用 from 流式传输页面,如下所示:

// break this out to clean up functions
const performSearch = (id, searchAfter?) => {
  return httpService.post(
    "http://service.com/search",
    {
      size: 50,
      ...searchAfter ? { search_after: searchAfter } : null,
      id,
    });
}

// main recursion
const _search = (id, processed, searchAfter?) => {
  return performSearch(id, searchAfter).pipe( // get page
    switchMap(res => {
      const body = res.data;
      const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
      const totalProcessed = processed + searches.length;
      if (totalProcessed < body.total) {
        // if not done, recurse and get next page
        return _search(id, totalProcessed, searches[searches.length - 1].cursor).pipe(
          // attach recursed pages
          map(nextPages => [searches].concat(nextPages)
        );
      }
      // if we're done just return the page
      return of([searches]);
    })
  )
}

// entry point
// switch into from to emit pages one by one
const search = id => _search(id, 0).pipe(switchMap(pages => from(pages))

如果您真正需要的是在所有页面都被获取之前一一发出,例如,这样您就可以在页面 1 可用时立即显示,而不是在页面 2+ 上等待,那么可以做了一些调整。告诉我。

编辑:此方法将一一发出

const _search = (id, processed, searchAfter?) => {
  return performSearch(id, searchAfter).pipe( // get page
    switchMap(res => {
      const body = res.data;
      const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
      const totalProcessed = processed + searches.length;
      if (totalProcessed < body.total) {
        // if not done, concat current page with recursive call for next page
        return concat(
          of(searches),
          _search(id, totalProcessed, searches[searches.length - 1].cursor)
        );
      }
      // if we're done just return the page
      return of(searches);
    })
  )
}
const search = id => _search(id, 0)

你最终会得到一个可观察的结构,例如:

concat(
  post$(page1),
  concat(
    post$(page2),
    concat(
      post$(page3),
      post$(page4)
    )
  )
)

由于嵌套的concat() 操作简化为扁平结构,因此该结构将简化为:

concat(post$(page1), post$(page2), post$(page3), post$(page4))

这是您所追求的,并且请求按顺序运行。

根据@NickL 的评论,expand 似乎也可以解决问题,例如:

search = (id) => {
  let totalProcessed = 0;
  return performSearch(id).pipe(
    expand(res => {
      const body = res.data;
      const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
      totalProcessed += searches.length;
      if (totalProcessed < body.data.total) {
        // not done, keep expanding
        return performSearch(id, searches[searches.length - 1].cursor);
      }
      return EMPTY; // break with EMPTY
    })
  )
}

虽然我以前从未使用过 expand,而且这是基于对它的一些非常有限的测试,但我很确定这是可行的。

如果您愿意,这两种方法都可以使用 reduce(或扫描)运算符来收集结果:

search(id).pipe(reduce((all, page) => all.concat(page), []))

【讨论】:

  • 谢谢,这段代码对我来说非常清楚,它会获取所有页面,然后一次性发出它们.pipe(switchMap(pages =&gt; from(pages)) 正在将它们转换为流。我真正需要的(正如我之前评论过的)是一次发出一页。
  • 是的,我在您发表评论之前发布了我的答案,我发布了调整以使其一次工作一页
  • 谢谢concat 方法实际上是我需要的,我也觉得很清楚。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-09-03
  • 1970-01-01
  • 2016-08-26
相关资源
最近更新 更多