【问题标题】:React Actions using RxJS for parallel async callsReact Actions 使用 RxJS 进行并行异步调用
【发布时间】:2021-06-08 10:36:36
【问题描述】:

我有一个现有的操作,它使用 ID 列表作为查询字符串参数调用 api,并获取传递的 ID 的缩略图响应。在某些情况下,ID 计数可以是 150-200 条记录。因此,我正在使用 RxJs 的 forkJoinsubscribe 方法使这个 API 调用基于批处理并并行运行。我在实施方面面临以下 3 个问题。

  1. 我能够在下一个订阅方法中获得基于批处理的响应。但是,在此函数中调用的操作以及响应并未被调用。

  2. maxParallelQueries 似乎没有按预期工作。所有的 fetchThumbnailObservables() 都是一次执行的,而不是在设置为常量的 3 个批次中/

  3. 如何处理 observable 的返回,以便 mergeMap 不会给出语法错误。 Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction, index: number) => ObservableInput<any>'. Type 'void' is not assignable to type 'ObservableInput<any>'.ts(2345)

这是我当前的代码的样子。任何帮助都会很好。

const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
    action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
        mergeMap( ({ payload }) => {
            
            const assetIds = Object.keys(payload);
            const meta = payload;

            const batchSize = 10;
            const maxParallelQueries = 3;
            
            const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];

            for (let count = 0; count < assetIds.length; count += batchSize) {
                fetchThumbnailObservables.push(AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize)));
            }

             forkJoin(fetchThumbnailObservables)
            .pipe(mergeMap(fetchThumbnailObservable => fetchThumbnailObservable, maxParallelQueries)) // Issue 1 :  maxParallelQueries limit is not working
            .subscribe({ 
                complete: () => { }, 
                error: () => { },
                next: (response) => { 
                     // Issue 2 :  below action dont seems to be getting invoked
                    actions.AssetActions.receivedThumbnailsAction(response, meta) 
                },     
             });
    
            // Issue 3 :  How to handle return of observable 
            // Added below code for prevennting error raised by mergeMap( ({ payload }) => {
            return new Observable<any>();
        }),
        catchError((error) => of(actions.Common.errorOccured({ error })))
    );
    ```

【问题讨论】:

    标签: reactjs rxjs subscription redux-observable


    【解决方案1】:
    1. 我能够在下一个订阅方法中获得基于批处理的响应。但是,在此函数中调用的操作以及响应并未被调用。

    你不能在 redux-observable 中强制调用一个动作。相反,您需要将它们排队。 actions.AssetActions.receivedThumbnailsAction(response, meta) 将返回一个形状为{type: "MY_ACTION", payload: ...} 的普通对象,而不是调度一个动作。您宁愿返回包裹在 mergeMap 内的 observable 中的对象,以便将下一个操作添加到队列中。 (见下面的解决方案)

    1. maxParallelQueries 似乎没有按预期工作。所有的 fetchThumbnailObservables() 都是一次执行的,而不是设置为常量的 3 个批次/

    第一个猜测:AssetDataService.fetchThumbnailData(assetIds.slice(count, count + batchSize)) 的返回类型是什么?如果它是Promise 类型,那么maxParallelQueries 无效,因为promise 是急切的,并且在创建时立即启动,因此在mergeMap 有机会控制请求之前。因此,请确保 fetchThumbnailData 返回一个 observable。如果您需要将 Promise 转换为 Observable,请使用 defer 而不是 from,以确保不会在 fetchThumbnailData 内过早触发 Promise。

    第二个猜测:forkJoin 正在触发所有请求,因此mergeMap 甚至没有机会限制并行查询的数量。我很惊讶打字稿没有在这里警告您mergeMap(fetchThumbnailObservable =&gt; fetchThumbnailObservable, ...) 中的返回类型不正确。我下面的解决方案将forJoin 替换为from,并且应该启用您的maxParallelQueries

    1. 如何处理 observable 的返回,以便 mergeMap 不会给出语法错误。 '({ payload }: IAction) => void' 类型的参数不可分配给 '(value: IAction, index: number) => ObservableInput' 类型的参数。类型 'void' 不可分配给类型 'ObservableInput'.ts(2345)

    这不是语法错误。在史诗中,如果没有后续操作,您可以选择返回一个空的 observable return empty(),或者通过返回 IAction 类型的 observable 来调度另一个操作。这是我(未经测试)尝试满足您的要求:

    const getThumbnails: Epic<IAction, IAction, IStoreState> = (action$, state$) =>
      action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
        mergeMap( ({ payload }) => {
                
          const assetIds = Object.keys(payload);
          const meta = payload;
    
          const batchSize = 10;
          const maxParallelQueries = 3;
                
          const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
    
          for (let count = 0; count < assetIds.length; count += batchSize) {
            fetchThumbnailObservables.push(
              AssetDataService.fetchThumbnailData(
                 assetIds.slice(count, count + batchSize)
              )
            );
          }
    
          return from(fetchThumbnailObservables)
            .pipe(
              mergeMap(
                fetchThumbnailObservable => fetchThumbnailObservable,
                maxParallelQueries
              ),
              map((response) =>
                // dispatch action for each response
                actions.AssetActions.receivedThumbnailsAction(response, meta)
              ),
              // catch error early so epic continue on error
              catchError((error) => of(actions.Common.errorOccured({ error })))
            );
        })
      );
    

    请注意,我将catchError 移动到更靠近请求的位置(在内部 forkJoin Observable 内),以便在发生错误时史诗不会终止。 Error Handling - redux observable 之所以如此,是因为当catchError 处于活动状态时,它会将失败的可观察对象替换为catchError 返回的可观察对象。而且我假设您不想在出现错误的情况下替换整个史诗?

    还有一件事:redux-observable 为您处理订阅,因此无需在应用程序的任何地方调用 .subscribe.unsubscribe。你只需要关心 observables 的生命周期。这里最重要的是要知道 observable 何时开始、完成时会发生什么、何时完成或 observable 抛出时会发生什么。

    【讨论】:

    • 非常感谢。这就像魅力一样。非常感谢对此的详细解释:)
    • 我很乐意为您提供帮助。 :-)
    【解决方案2】:

    我不知道 Epics,但我想我可以给你一些关于如何解决这个问题的提示。

    让我们从这样一个事实开始,即您有一个 id 数组 assetIds,并且您希望以可控的并发级别为每个 id 进行 http 调用 maxParallelQueries

    在这种情况下,from 函数和 mergeMap 运算符是你的朋友。

    你需要做的是

    // from function transform an array of values, in this case, to a stream of values
    from(assetIds).pipe(
      mergeMap(id => {
        // do whatever you need to do and eventually return an Observable
        return AssetDataService.fetchThumbnailData(id)
      }, maxParallelQueries) // the second parameter of mergeMap is the concurrency
    )
    

    通过这种方式,您应该能够调用 http 调用,保持您已建立的并发限制

    【讨论】:

    • 感谢您的快速帮助。我们如何获取每个 observable 的响应并以响应作为输入数据调用其他操作(在我的情况下为 actions.AssetActions.receivedThumbnailsAction(response, meta))?
    • 如果我理解你的意图,你可以再做一个合并图。第二个 mergemap 将接收来自第一个的发射,您可以使用它来返回您的第二个 observable。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-11-03
    • 1970-01-01
    • 1970-01-01
    • 2017-03-17
    • 2020-08-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多