【发布时间】:2020-02-28 16:20:07
【问题描述】:
我在内部和 Observable 流中执行分页。 分页是通过一个游标和一个使用递归的总计数来实现的。
我可以使用以下代码 observer.next(searches); 发出每个页面,顺便说一下,我想只使用 observable 而没有承诺,但我无法使用 RxJs 运算符表达递归。
有什么建议吗?
const search = id =>
new Observable(observer => { recursePages(id, observer) })
const recursePages = (id, observer, processed, searchAfter) => {
httpService.post(
"http://service.com/search",
{
size: 50,
...searchAfter ? { search_after: searchAfter } : null,
id,
})
.toPromise() // httpService.post returns an Observable<AxiosResponse>
.then(res => {
const body = res.data;
const searches = body.data.hits.map(search => ({ data: search.data, cursor: search.id }));
observer.next(searches);
const totalProcessed = processed + searches.length;
if (totalProcessed < body.data.total) {
return recursePages(id, observer, totalProcessed, searches[searches.length - 1].cursor);
}
observer.complete();
})
}
// General Observer
incomingMessages.pipe(
flatMap(msg => search(JSON.parse(msg.content.toString()))),
concatAll(),
).subscribe(console.log),
【问题讨论】:
-
我对 RxJS 不是很熟悉,但是
take可以用于这个吗? learnrxjs.io/learn-rxjs/operators/filtering/take -
这里的想法是一次发出一页吗?还是一次性全部发射?
-
@Ben
take仅用于从源 observable 中获取给定数量的值,例如 ``` of(1,2,3,4,5).pipe(take(3) ).subscribe(console.log) // 输出 1 2 3 ``` -
@bryan60 这个想法是一次发出一个页面,以便一个一个地处理它们
-
查看扩展运算符。您可以提供一个递归函数,该函数返回一个可观察的 rxjs-dev.firebaseapp.com/api/operators/expand
标签: javascript recursion rxjs observable reactive-programming