【发布时间】:2018-09-11 16:13:33
【问题描述】:
第一次使用 RxJS。基本上,我正在尝试制作一个从查询字符串中检索推文的推特刮板。搜索 url 允许指定一个 min_position 参数,该参数可以是上一个搜索的最后一个 id 以进行分页排序。
这个过程看起来像这样(它在最后循环回来):
get page -> next() each scraped tweet -> set min_position -> get page (until !has_more_items)
请求页面返回一个承诺,所以我不得不等到完成,直到我可以继续。我希望将异步函数传递给Observable.create(),但这似乎不起作用,它只被调用一次。
编辑
在尽我所能阅读您的资源后,我玩了一会。我对我的问题提出了以下抽象。
import { from, Observable } from 'rxjs'
import { concatMap, map, switchMap } from 'rxjs/operators'
let pageNumber = 0
const PAGE_SIZE = 3, MAX_PAGES = 3
async function nextPage() {
if (pageNumber >= MAX_PAGES) {
throw new Error('No more pages available')
}
await new Promise(res => setTimeout(res, 500)) // delay 500ms
const output = []
const base = pageNumber++ * PAGE_SIZE
for (let i = 0; i < PAGE_SIZE; i++) {
output.push(base + i)
}
return output
}
function parseTweet(tweet: number): string {
// simply prepend 'tweet' to the tweet
return 'tweet ' + tweet
}
const getTweets = (): Observable<string> => {
return from(nextPage()) // gets _html_ of next page
.pipe(
concatMap(page => page), // spreads out tweet strings in page
map(tweet => parseTweet(tweet)), // parses each tweet's html
switchMap(() => getTweets()) // concat to next page's tweets
// stop/finish observable when getTweets() observable returns an error
)
}
getTweets()
.subscribe(val => console.log(val))
它非常接近工作,但现在每当nextPage() 返回一个被拒绝的承诺时,整个 observable 都会中断(没有任何记录到控制台)。
我尝试在 pipe 之后插入 catchError 来完成 observable 而不是运行并抛出错误,但我无法让它工作。
此外,这种实现是递归的,我希望避免这种情况,因为它不可扩展。我不知道将来会在 observable 中处理多少推文/页面。似乎所有 3 个页面的推文都必须在 observable 开始发出值之前进行处理,这当然不是它应该如何工作的。
感谢您的帮助! :)
【问题讨论】:
-
提示:
switchMap -
这就是你要找的东西吗? stackoverflow.com/a/35494766/3772379
-
expand也可能是这种情况 - 请参阅 blog.angularindepth.com/rxjs-understanding-expand-a5f8b41a3602 -
伙计,这一切都太混乱了。我将尝试一起编写一些代码,然后我会回复你们!
-
编辑了我的代码抽象的帖子。
标签: javascript rxjs observable