【问题标题】:RxJS: Parallel http call to paginated API using NestJS HttpServiceRxJS:使用 NestJS HttpService 对分页 API 进行并行 http 调用
【发布时间】:2021-02-06 07:06:57
【问题描述】:

我正在使用 NestJS,这是我当前的实现并行 http 请求:

@Injectable()
export class AppService {
  constructor(private readonly http: HttpService) {}

  private fetchData(index: number) {
    Logger.log(`Collect index: ${index}`);

    return this.http
      .get<Passenger>(
        `https://api.instantwebtools.net/v1/passenger?page=${index}&size=100`,
        { validateStatus: null },
      )
      .pipe(concatMap(response => of(response.data)));
  }

  async getAllData() {
    let a = 0;
    const collect: Passenger[] = [];
    const $subject = new BehaviorSubject(a);

    await $subject
      .pipe(
        flatMap(index =>
          forkJoin([
            this.fetchData(index),
            this.fetchData(index + 1),
            this.fetchData(index + 2),
          ]).pipe(mergeAll()),
        ),
        tap(data => {
          collect.push(data);

          if (data?.data?.length === 0) {
            $subject.complete();     // stop the stream
          } else {
            a += 3;     // increment by 3, because the request is 3 times at a time
            $subject.next(a);
          }
        }),
      )
      .toPromise();

    return collect;
  }
}

此服务用于收集第三方数据。至于现在,fetchData()函数根据我一次想要多少并行请求被多次调用。我使用虚拟 API 进行测试,但在实际场景中,API 端点大小限制为 100,并且它不返回有关 totalPage 多少的元信息。它只是在到达最后一页时返回空数据。

目标是发出并行请求并在最后合并结果。我这样做是为了尽可能缩短请求时间,并且因为 API 本身的速率限制为每秒 50 个请求。如何优化这段代码?

【问题讨论】:

  • 如果您不知道有多少页,那么如何使请求并行?
  • API端点返回的数据为空时停止请求

标签: node.js typescript rxjs nestjs


【解决方案1】:

要一次性获取所有页面,您可以使用expand 递归订阅获取某些页面的可观察对象。当您收到的最后一页为空时,通过返回EMPTY 来结束递归。

function fetchAllPages(batchSize: number = 3): Observable<any[][]> {
  let index = 0;
  return fetchPages(index, batchSize).pipe(
    // if the last page isn't empty fetch the next pages, otherwise end the recursion
    expand(pages => pages[pages.length - 1].length > 0 
      ? fetchPages((index += batchSize), batchSize) 
      : EMPTY
    ),
    // accumulate all pages in one array, filter out any trailing empty pages
    reduce((acc, curr) => acc.concat(curr.filter(page => page.length)), [])
  );
}

// fetch a given number of pages starting from 'index' as parallel requests
function fetchPages(index: number, numberOfPages: number): Observable<any[][]> {
  const requests = Array.from({ length: numberOfPages }, (_, i) =>
    fetchData(index + i)
  );
  return forkJoin(requests);
}

https://stackblitz.com/edit/rxjs-vkad5h?file=index.ts

这显然会在最后一批 if
(totalNumberOfPages + 1) % batchSize != 0 中发送一些不必要的请求。

【讨论】:

    猜你喜欢
    • 2021-03-01
    • 2019-01-25
    • 2020-04-19
    • 1970-01-01
    • 2016-04-29
    • 1970-01-01
    • 1970-01-01
    • 2021-06-08
    • 1970-01-01
    相关资源
    最近更新 更多