【问题标题】:How to make recursive HTTP calls using RxJs operators?如何使用 RxJs 运算符进行递归 HTTP 调用?
【发布时间】:2021-05-04 23:25:00
【问题描述】:

我在奇怪的情况下使用以下方法通过为每个 HTTP 调用传递 pageIndex (1) 和 pageSize (500) 来检索数据。

this.demoService.geList(1, 500).subscribe(data => {
    this.data = data.items;
});

响应有一个名为isMore 的属性,如果isMore 为真,我想更奇怪地修改我的方法以继续HTTP 调用。我还需要合并返回的值,最后返回累加的值。

例如,假设有 5000 条记录,并且直到第 10 次 HTTP 调用,服务会为 isMore 值返回 true。在第 10 次 HTTP 调用后,它返回 false,然后此方法将 this.data 值设置为合并的 5000 条记录。对于这个问题,我应该使用mergeMapexpand 还是另一个RxJs 运算符?解决这个问题的正确方法是什么?

更新:我使用下面的方法,但是它没有合并返回值,也没有增加pageIndex。出于这个原因,它不起作用(我尝试进行一些更改,但无法使其起作用)。

let pageIndex = 0;
this.demoService.geList(pageIndex+1, 500).pipe(
    expand((data) => {
        if(data.isComplete) {
            return of(EMPTY);
        } else {
            return this.demoService.geList(pageIndex+1, 500);
        }
    })
).subscribe((data) => {
    //your logic here
});

更新二:

of({
    isMore : true,
    pageIndex: 0,
    items: []
  }).pipe(
    expand(data => demoService.geList(data.pageIndex+1, 100)
    .pipe(
      map(newData => ({...newData, pageIndex: data.pageIndex+1}))
    )),
    // takeWhile(data => data.isMore), //when using this, it does not work if the total record is less than 100
    takeWhile(data => (data.isMore || data.pageIndex === 1)), // when using this, it causing +1 extra HTTP call unnecessarily
    map(data => data.items),
    reduce((acc, items) => ([...acc, ...items]))
  )
  .subscribe(data => {
    this.data = data;
  });  

更新三:

最后我通过修改 Elisseo 的方法使其工作,如下所示。但是,**我需要将其设为无效并在此 getData() 方法中设置 this.data 参数。我该怎么做?

getData(pageIndex, pageSize) {
  return this.demoService.geList(pageIndex, pageSize).pipe(
    switchMap((data: any) => {
      if (data.isMore) {
        return this.getData(pageIndex+1, pageSize).pipe(
          map((res: any) => ({ items: [...data.items, ...res.items] }))
        );
      }
      return of(data);
    })
  );
}

我想将以下订阅部分合并到这种方法,但由于某些错误,例如,我不能“'void' 类型上不存在属性 'pipe'。”

.subscribe((res: any) => {
    this.data = res;
});

【问题讨论】:

  • 我只需要在递归HTTP调用后累积返回值。我认为这是分页或递归需求的一般情况。有什么帮助吗?
  • 我希望 RxJsJavaScript 大师们看到这个问题并给出正确的解决方法:)
  • 如果您使用分页,您不应该在事件发生后获取数据,例如 pageIndex 更改?如果您的方法是分页,为什么要尝试获取所有数据?如果您想获取每个数据,为什么要设置上限来获取数据?无法理解您尝试实现的逻辑。
  • 谢谢朋友。其实这个方法是用来分页的,叫做点击下一个/上一个按钮,你是对的。但除此之外,我还需要通过检查响应的isMore 值来获取记录,因为随着将来记录的增加,给出一个固定的数字并不能解决问题。
  • 在这个例子中不要考虑分页,只是认为我们需要使用从1开始的页码进行递归调用,然后检查响应的isMore值。如果为真,继续递归调用,合并之前的结果。如果为 false,则只返回第一个响应中的记录。

标签: javascript reactjs angular typescript rxjs


【解决方案1】:

我不认为递归是正确的方法:

interval(0).pipe(
    map(count => this.demoService.getList(count + 1, 500)),
    takeWhile(reponse => response.isMore, true),
    reduce((acc, curr) => //reduce any way you like),
).subscribe();

这应该会调用您的端点,直到端点返回 isMore === false。 interval 的美妙之处在于我们可以免费获得 count 变量。

但是,如果您打算使用 recrsion,这里是使用 expand-operator 的 rxjs 方式(请参阅docs)。我发现它的可读性稍差,因为它需要一个 if-else-construct,这会增加代码的复杂性。外部的“计数器”变量也不是最佳的。

let index = 1;
this.demoService.geList(index, 500).pipe(
    expand(response => response.isMore ? this.demoService.geList(++index, 500) : empty()),
    reduce((acc, curr) => //reduce here)
).subscribe();

【讨论】:

  • 感谢您的帮助。在提出问题之前,我也尝试了takeWhile,但在这种情况下,我发送了一个额外的 1 个 HTTP 调用,因为我在调用后检查了条件。但也许在 yoru 方法中不会出现问题。但是,由于this.processResponse,我无法使其工作。在这一行中,我需要合并每个结果,我不知道我应该使用 HTTP 调用方法而不是 interval(1000)。有什么想法吗?
  • 您也可以在我的更新中编辑方法。
  • @Max - 我很难想象这个。您能解释一下您期望订阅()中的“数据”是什么吗?
  • @Max - 你试过使用reduce吗? rxjs-dev.firebaseapp.com/api/operators/reduce
  • 我期望对象数组广告数据。减少可能是一个不错的选择,但您为什么不能根据我在问题更新部分的代码发布答案?我的问题中没有间隔等。
【解决方案2】:

只要 api 响应为您提供 isMore 标志,您是否完全更改记录并不重要。

我将跳过如何实现减速器动作事件的部分,我假设您已经完成了该部分。所以我将尝试用伪代码来解释。

您有一个带有分页数据的表格或类似的东西。在初始状态下,您可以创建一个 loadModule 效果或使用此 fn:

getPaginationDataWithPageIndex(pageIndex = 1){ this.store.dispatch(new GetPaginationData({ pageIndex: pageIndex, dataSize: 500})); }

在您的 GetPaginationData 效果中

... map(action => {
return apicall.pipe(map((response)=> {
   if(response.isMore){
    return new updateState({data:response.data, isMore: responseisMore})
} else {
   return new updateState({isMore: response.isMore}),
}
}})
`

all you have to left is subscribing store in your .ts if isMore is false you will not display the next page button. and on your nextButton or prevButton's click method you should have to just dispatch the action with pageIndex

【讨论】:

  • 忘记一切,请看一下我的Update II。我添加了 2 cmets 关于该方法的问题。有什么想法吗?
  • 感谢 Talha,但我很困惑,如果您根据使用的方法编辑答案,我会很高兴。如果我修复了额外的 1 个 HTTP 调用问题,则更新 II 最好进行编辑。
  • 非常感谢您的帮助。我想投票赞成你的解决方案,但不幸的是我没有足够的声誉。请为这个问题投票吗?问候。
  • @Max 很高兴我能提供帮助
  • 你对我的更新 III 有任何想法吗?
【解决方案3】:
getData(pageIndex, pageSize) {
    return this.demoService.getList(pageIndex, pageSize).pipe(
      switchMap((data: any) => {
        if (!data.isCompleted) {
          return this.getData(pageIndex+1, pageSize).pipe(
            map((res: any) => ({ data: [...data.data, ...res.data] }))
          );
        }
        return of(data);
      })
    );
  }

stackblitz 注意:在我写 pageIndex++ 之前,我更新了作为参数 pageIndex+1 的 @mbojko 建议

更新 2

使用扩展运算符,我们需要考虑到我们需要用一个带有 pageIndex 的对象来提供“递归函数”——这在我们的调用中是必需的——为此,当我们制作 this.demoService.getList(data.pageIndex+1,10) 时,我们需要“转换结果”添加一个新的属性“pageIndex”。为此,我们使用“地图”

  getData() {
    //see that initial we create "on fly" an object with properties: pageIndex,data and isCompleted
    return of({
      pageIndex:1,
      data:[],
      isCompleted:false
    }).pipe(
      expand((data: any) => {
        return this.demoService.getList(data.pageIndex,10).pipe(
            //here we use map to create "on fly" and object
            map((x:any)=>({
              pageIndex:data.pageIndex+1, //<--pageIndex the pageIndex +1
              data:[...data.data,...x.data], //<--we concatenate the data using spread operator
              isCompleted:x.isCompleted}))  //<--isCompleted the value
        )
      }),
      takeWhile((data: any) => !data.isCompleted,true), //<--a take while
            //IMPORTANT, use "true" to take account the last call also
      map(res=>res.data)  //finally is we only want the "data" 
                          //we use map to return only this property
    )
  }

我们可以做一个这样的函数:

  getData() {
    of({pageIndex:1,data:[],isCompleted:false}).pipe(
      expand((data: any) => {
        return this.demoService.getList(data.pageIndex,10).pipe(
            tap(x=>{console.log(x)}),
            map((x:any)=>({
              pageIndex:data.pageIndex+1,
              data:[...data.data,...x.data],
              isComplete:x.isComplete}))
        )
      }),
      takeWhile((data: any) => !data.isComplete,true), //<--don't forget the ",true"
    ).subscribe(res=>{
       this.data=res.data
    })
  }

请注意,在这种情况下,我们不返回 else 简单订阅函数并将变量 this.data 与 res.data 相等 - 这就是我们不需要最后一个映射的原因

更新 3Mrk Sef

最后,如果您不希望您的流发出间歇性的值,而您只想要最终的串联数据,您可以从expand 中删除数据串联,然后改用reduce

  getData() {
    of({
      pageIndex: 1,
      data: [],
      isCompleted: false
    })
      .pipe(
        expand((prevResponse: any) => this.demoService.getList(prevResponse.pageIndex, 10).pipe(
            map((nextResponse: any) => ({
              ...nextResponse,
              pageIndex: prevResponse.pageIndex + 1
            }))
          )
        ),
        takeWhile((response: any) => !response.isCompleted, true),
        // Keep concatenting each new array (data.items) until the stream
        // completes, then emit them all at once
        reduce((acc: any, data: any) => {
          return [...acc, ...data.data];
        }, [])
      )
      .subscribe(items => {
        this.data=items;
      });
  }

【讨论】:

  • 不是pageIndex++; return this.getData(pageIndex, pageSize),而是return this.getData(pageIndex + 1, pageSize)
  • @Eliseo 是的,正是我想要的,它不会发送额外的 HTTP 请求。有 810 条记录,如果我设置页面大小为 100,它发送 9 个 HTTP 调用,如果页面大小为 500,它发送 2 个 HTTP 调用:)
  • 我需要进行一些修改,但最终成功了。另一方面,如何使其无效并在 getData 方法中设置 this.data 而不是 onInit?
  • 顺便说一句,我不能投票,因为我没有足够的声誉。你能投票吗?
  • @Max,我只是更正了一些代码,更改了“pageIndex”,真的我不太喜欢之前的getList(data.pageIndex+1,10),但你需要从1开始
猜你喜欢
  • 2017-12-12
  • 1970-01-01
  • 2013-11-20
  • 1970-01-01
  • 1970-01-01
  • 2022-01-17
  • 1970-01-01
  • 2021-05-03
  • 2015-02-25
相关资源
最近更新 更多