【问题标题】:How to complete a subscriber when a for loop is done?完成for循环后如何完成订阅者?
【发布时间】:2021-10-17 14:52:00
【问题描述】:

如果您向https://api.github.com/users/benawad/repos 发出GET 请求,您将获得用户benawad 拥有的所有存储库。至少这就是 API 端点预期返回的内容。问题是 GitHub 将存储库的数量限制为 30 个。

截至今天(2021 年 8 月 14 日),用户 benawad 拥有 246 个存储库

要克服上述问题,您应该发出一个 GET 请求,但需要添加一些额外的参数...... GitHub 已经对其 API 实现了分页。因此,您应该在 URL 中指定要检索的页面每页的存储库数量

我们的新GET 请求应如下所示:

https://api.github.com/users/benawad/repos?page=1&per_page=1000

问题在于 GitHub 将每页的存储库数量限制为 100 个。因此我们的 GET 请求返回 100 个存储库,而不是用户 benawad 当前拥有的 246 个存储库。

在我的 Angular 服务中,我实现了以下代码来从所有页面中检索所有 repos。

  public getUserRepos(user: string): Observable<RepositoryI[]> {
    return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
      this.getUserData(user).subscribe((data: UserI) => {
        //public_repos is the amt of repos the user has
        const pages: number = Math.ceil(data.public_repos / 100);

        for (let i = 1; i <= pages; i++) {
          this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .subscribe((data: RepositoryI[]) => {
              subscriber.next(data);
            });
        }
      });
    });
  }

在我的组件中,我订阅了以下内容:

  this.userService.getUserRepos(id).subscribe((repos)=>{
    this.repositories.push(...repos);
  })

这个方法的问题是我无法控制 Observable 何时停止发出值。在我的组件中,我想在 Observable 完成时触发一个函数。

我尝试了以下方法:

  public getUserRepos(user: string): Observable<RepositoryI[]> {
    return new Observable((subscriber: Subscriber<RepositoryI[]>) => {
      this.getUserData(user).subscribe((data: UserI) => {
        const pages: number = Math.ceil(data.public_repos / 100);

        for (let i = 1; i <= pages; i++) {
          this.http
            .get(
              `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
            )
            .subscribe((data: RepositoryI[]) => {
              subscriber.next(data);
              // If the for loop is complete -> complete the subscriber
              if(pages == i) subscriber.complete();
            });
        }
      });
    });
  }

在我的组件中,我执行以下操作:

  this.userService.getUserRepos(id).subscribe(
    (repos) => {
      this.repositories.push(...repos);
    },
    (err) => {
      console.log(err);
    },
    () => {
      // When the observable is complete:
      
      console.log(this.repositories); // This only logs 46 repositores, it should log 246
      // I want to trigger some function here
    }
  );

console.log() 仅记录 46 个存储库。为什么会这样?也许我在获取所有 3 个页面之前完成了订阅者;但我在订阅中调用.complete()。我究竟做错了什么?提前致谢。

【问题讨论】:

    标签: angular typescript rxjs github-api


    【解决方案1】:

    试图提供更直接的答案。如果你不喜欢嵌套,你总是可以将内部 observable 声明为单独的方法。

    public getUserRepos = (user:string) =>
      this.getUserData(user)
      .pipe(
        map(({public_repos})=>Math.ceil(public_repos / 100)),
        switchMap(max=>interval(1)
        .pipe(
          takeWhile(i=>++i<=max)
          mergeMap(i=>this.http.get<RepositoryI[]>(`https://api.github.com/users/${user}/repos?page=${++i}&per_page=100`))
        )),
        scan((acc, repo)=>[...acc, ...repo], [] as RepositoryI[])
      );
    

    这是我使用的策略:

    1. getUserData() 获取页面限制(使用对象解构),就像您已经拥有的一样。
    2. 使用interval() 开始一个可观察的计数(以0 开头,因此稍后使用++)。
    3. 使用takeWhile()检查计数器是否超过页面限制,否则可观察到这里完成。
    4. 使用mergeMap()interval() 发出的每个号码排队一系列API 调用。
    5. 使用scan() 将所有响应减少为单个数组响应。

    更新如果你测试这个方法,你需要更新你的组件逻辑。

    因为scan() 将所有 API 请求减少到单个 observable,您可以将这些值分配给您的状态数组,而不是推送它们。

    这个 observable 将在第一个 API 请求后立即发出。随着每个额外的 API 请求完成,observable 将发出一个包含添加项的新数组。这样做是为了让您的组件不必等到 所有 API 请求完成后才能显示一些数据。

    【讨论】:

      【解决方案2】:

      您可以使用 RxJS 运算符和函数通过一次订阅来实现,如下所示:

      public getUserRepos(user: string): Observable<RepositoryI[]> {
        // will return one observable that contains all the repos after concating them to each other:
        return this.getUserData(user).pipe(
          switchMap((data: UserI) => {
            //public_repos is the amt of repos the user has
            const pages: number = Math.ceil(data.public_repos / 100);
      
            // forkJoin will emit the result as (RepositoryI[][]) once all the sub-observables are completed:
            return forkJoin(
              Array.from(new Array(pages)).map((_, page) =>
                this.http.get<RepositoryI[]>(
                  `https://api.github.com/users/${user}/repos?page=${page + 1}&per_page=100`
                )
              )
            ).pipe(
              // will reduce the RepositoryI[][] to be RepositoryI[] after concating them to each other:
              map((res) =>
                res.reduce((acc, value) => {
                  return acc.concat(value);
                }, [])
              )
            );
          })
        );
      }
      

      然后在您的组件中,您可以订阅 observable,它将在获取所有 repos 后返回所有 repos:

        this.userService.getUserRepos(id).subscribe((repos) => {
          this.repositories = repos;
          // You can trigger the function that you need here...
        });
      

      【讨论】:

        【解决方案3】:

        总结

        我将描述两种实现方式,一种是命令式的(使用循环),另一种是反应式的(使用 rxjs)

        命令式

        你可以做一个简单的循环来获取所有页面(这个返回一个承诺,但如果需要你可以使用from将它转换为一个可观察的):

        public getUserRepos(user: string): Promise<Repository[]> {
          let pageNumber = 0;
          let results = [];
        
          const getPage = pageNumber =>
            this.http
              .get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
              .toPromise();
        
          do {
            const pageResults = await getPage(pageNumber++);
            results = results.concat(pageResults);
          } while(pageResults.length !== 0);
        
          return results;
        }
        

        反应性

        您可以使用 rxjs 的 expand 运算符来查询一个页面,然后是下一页等。

        该用法要求您将 http 结果映射到还包含页码的对象,这样expand 就可以知道接下来要获取哪个页面。

        你的例子:

        public getUserRepos(user: string): Observable<Repository[]> {
          const getPage = pageNumber =>
            this.http
              .get<Repository[]>(`https://api.github.com/users/${user}/repos?page=${pageNumber}&per_page=100`)
              .pipe(map(z => ({ results: z, page: pageNumber }))); // Add page number to results
        
          return getPage(0).pipe(
            expand(pr => getPage(pr.page + 1)),
            takeWhile(pr => pr.results.length !== 0),
            map(pr => pr.results),
            scan((acc, v) => [...acc, ...v])
          );
        }
        

        结果将是对第 0 页的查询,然后当它解析时,对第 1 页的查询等。当一个页面导致 0 个条目时,observable 将完成。

        我使用scan 在获取页面时提供部分结果,这样您就可以看到正在发生的事情,而不是在显示任何内容之前等待所有页面解决。

        例如StackBlitz

        注意:在最后一页之后会有一个额外的查询将被取消,这是无害的,但如果您想避免这种情况,请将您的 expand 调用更改为:

        expand(pr => (pr.results.length !== 0 ? getPage(pr.page + 1) : EMPTY))
        

        【讨论】:

          【解决方案4】:

          Javascript 异步运行。

          您的“For 循环”将并行运行 api 并同时增加计数。

          当您进行 3 个并行 api 调用时,它们可能会在不同的持续时间内返回。

          但在此之前,您的“i”值将达到 3。

          因此,在您的情况下,您的第 3 次 api 调用(记录较少)(46 条记录)在前 2 次 api 调用之前返回,并且首先进入订阅逻辑,而您只看到 46 条记录。

                        subscriber.next(data); // 3rd api comes here 
                        
                        if(pages == i) subscriber.complete();  // now your pages and i values are same and it gets completed.
                      });
          
          

          解决方案

          
          import {defer } from 'rxjs';
          
          defer(async () => {
               for (let i = 1; i <= pages; i++) {
                    await this.http
                      .get(
                        `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
                      )
                      .toPromise().then((data: RepositoryI[]) => {
                        subscriber.next(data);
                      });
                  }
          }.subscribe(x => { subscriber.complete(); });
          
          

          【讨论】:

          • 我得到一个带有解决方案的空数组。 VS Code 还说等待是不必要的。我尝试了离开和删除它。
          • 你可以试试这个-----await this.http.get(https://api.github.com/users/${user}/repos?page=${i}&amp;per_page=100).toPromise().then((data: RepositoryI[]) => {subscriber.next(data ); });
          【解决方案5】:
                    let subs = interval(1000).subscribe(() => {
                    this.http
                      .get(
                        `https://api.github.com/users/${user}/repos?page=${i}&per_page=100`
                      )
                      .subscribe((data: RepositoryI[]) => {
                        subscriber.next(data);
                        if(!data) { // to check if data is null
                          subs = null;  // if null, cancel the interval subscription
                          return;  // and return (not continue the subscription)
                        }
                      });
                    })
                  }
          

          这样你存储订阅,检查是否有数据后,你取消订阅(停止)。

          如果可行,请告诉我。

          【讨论】:

          • 我想要一个使用subscriber.complete() 的解决方案,这样我就可以跟踪 Observable 何时停止发出值。
          • 你为什么不尝试偏离 complete() 并尝试其他事情?如果您想跟踪 Observable,只需创建一个可让您知道数据何时为空的 observable。你有什么特别的理由继续使用 complete() 吗?
          • 如果您想要一个在另一个为空时完成的可观察对象,请使用 two.pipe(takeUntil(one.pipe(filter(r=&gt;r.length===0))))
          猜你喜欢
          • 2019-09-23
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2022-07-05
          • 2017-07-27
          相关资源
          最近更新 更多