【问题标题】:Async in RxJS ObservableRxJS Observable 中的异步
【发布时间】:2018-09-11 16:13:33
【问题描述】:

第一次使用 RxJS。基本上,我正在尝试制作一个从查询字符串中检索推文的推特刮板。搜索 url 允许指定一个 min_position 参数,该参数可以是上一个搜索的最后一个 id 以进行分页排序。

这个过程看起来像这样(它在最后循环回来):

get page -> next() each scraped tweet -> set min_position -> get page (until !has_more_items)

请求页面返回一个承诺,所以我不得不等到完成,直到我可以继续。我希望将异步函数传递给Observable.create(),但这似乎不起作用,它只被调用一次。

编辑

在尽我所能阅读您的资源后,我玩了一会。我对我的问题提出了以下抽象。

import { from, Observable } from 'rxjs'
import { concatMap, map, switchMap } from 'rxjs/operators'

let pageNumber = 0
const PAGE_SIZE = 3, MAX_PAGES = 3
async function nextPage() {
    if (pageNumber >= MAX_PAGES) {
        throw new Error('No more pages available')
    }

    await new Promise(res => setTimeout(res, 500)) // delay 500ms

    const output = []
    const base = pageNumber++ * PAGE_SIZE
    for (let i = 0; i < PAGE_SIZE; i++) {
        output.push(base + i)
    }
    return output
}

function parseTweet(tweet: number): string {
    // simply prepend 'tweet' to the tweet
    return 'tweet ' + tweet
}

const getTweets = (): Observable<string> => {
    return from(nextPage()) // gets _html_ of next page
        .pipe(
          concatMap(page => page), // spreads out tweet strings in page
          map(tweet => parseTweet(tweet)), // parses each tweet's html
          switchMap(() => getTweets()) // concat to next page's tweets
          // stop/finish observable when getTweets() observable returns an error
        )
}

getTweets()
    .subscribe(val => console.log(val))

它非常接近工作,但现在每当nextPage() 返回一个被拒绝的承诺时,整个 observable 都会中断(没有任何记录到控制台)。

我尝试在 pipe 之后插入 catchError 来完成 observable 而不是运行并抛出错误,但我无法让它工作。

此外,这种实现是递归的,我希望避免这种情况,因为它不可扩展。我不知道将来会在 observable 中处理多少推文/页面。似乎所有 3 个页面的推文都必须在 observable 开始发出值之前进行处理,这当然不是它应该如何工作的。

感谢您的帮助! :)

【问题讨论】:

标签: javascript rxjs observable


【解决方案1】:

我们需要加载Twits 直到某些条件并以某种方式使用Promise?看例子:

function loadTwits(id) {
  // Observable that replay last value and have default one
  twitId$ = new BehaviorSubject(id);

  return twitId$.pipe(
    // concatMap - outside stream emit in order inner do
    // from - convert Promise to Observable
    concatMap(id => from(fetchTwits(id))),
    map(parseTwits),
    // load more twits or comlete
    tap(twits => getLastTwitId(twits) ? twitId$.next(getLastTwitId(twits)) : twitId$.complete())
  )
}

【讨论】:

  • 您好,Buggy,感谢您的回答。最后一行tap,不是在每页之后循环,而是在每条推文之后都这样做吗?
  • 嗨詹姆斯,我假设fetchTwits(id) 请求新页面。如果不正确 - 请更清楚地描述您拥有什么以及您想要获得什么。
【解决方案2】:

在进一步研究 expand 并意识到这是我的 observable 中需要的递归之后,我发现了这一点。这是创建 observable 的代码:

const nextPage$f = () => from(nextPage()) // gets _html_ of next page
    .pipe(
        concatMap(page => page), // spreads out tweet strings in page
        map(tweet => parseTweet(tweet)) // parses each tweet's html
    )

const tweets$ = nextPage$f()
    .pipe(
        expand(() => morePages() ? nextPage$f() : empty())
    )

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-10-03
    • 2017-08-31
    • 2020-05-07
    • 2017-02-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多