【问题标题】:Pass data from first operator in a pipe into two custom operators?将管道中第一个运算符的数据传递给两个自定义运算符?
【发布时间】:2021-09-30 20:31:27
【问题描述】:

我最近接手了一个 Angular 代码库,无法访问以前的开发人员,并且正在努力理解所使用的一些 RxJS。

目前有一个两步文件上传过程,我想基本上复制它以上传可流式视频。上传过程向接收表单数据的 API 发出请求以上传文件(现在是视频)。当前代码将初始文件上传请求与mergeMap 中每个文件的表单数据相结合,并向 S3 发出请求。这对文件很有用,但我似乎无法对视频进行同样的处理。

据我了解,我需要创建一种方法将数据从 map 运算符传递给两个自定义运算符,而不是一个。我曾尝试在mergeMap 中使用forkJoin 来组合进程,但在将数据从初始map 运算符传递到这两个函数时一直遇到问题。这是我的初始上传请求和自定义运算符的代码。

  addItem(courseCode: string, unitId: string, formValue: AddItemFormValue) {
    return this.courseApi.addItem(courseCode, unitId, body).pipe(
      map(data => data.data),
      this.makeFileUploadRequests(formValue),
      // Trying to add: this.makeVideoUploadRequests(formValue)
    );
  }

  private makeFileUploadRequests<T extends { files: { name: string; params: ServerModel.FileResponse }[] }>(formValue: {
    files: UploadingFile[];
  }) {
    return mergeMap<T, Observable<ServerModel.FileUploadedResponse[] | T>>(data => {
      return Array.isArray(data.files) && data.files.length > 0 ? this.uploadFiles(data.files, formValue.files) : of(data);
    });
  }

我认为造成这种混乱的部分原因是用于自定义运算符的语法。我已经为上传视频创建了一个类似的操作符,但是如何在确保来自map 的流被传递到两者的同时运行它们?

【问题讨论】:

    标签: angular typescript amazon-s3 rxjs


    【解决方案1】:

    令人失望的是,通常情况下,答案是“视情况而定”。现在,addItem 正在返回一个 Observable。 Observable 基本上是一个可以插入的管道,它有时会发出值。 addItem 的 Observable 应该如何发出值?

    一般答案

    您要问的一般问题是,“如何将 Observable 的值传递给 2 个不同的运算符?”一般的答案是:您创建 2 个管道。

    例如:

    // Emits a number every second, which is multiplied by 10
    const foo$ = interval(1000).pipe(
      map(v => v * 10)
    )
    
    const stringified$ = foo$.pipe(
      map(v => v.toString())
    )
    
    const percentage$ = foo$.pipe(
      map(v => v / 100)
    )
    

    pipe 函数只是简单地返回另一个 Observable,这样 Observable 就可以被送入另一个管道。在您的示例中,这意味着您可以这样做:

    addItem(courseCode: string, unitId: string, formValue: AddItemFormValue) {
      const data$ = this.courseApi.addItem(courseCode, unitId, body).pipe(
        map(data => data.data)
      )
    
      const fileUpload$ = data$.pipe(this.makeFileUploadRequests(formValue))
      const videoUpload$ = data$.pipe(this.makeVideoUploadRequests(formValue))
    
      return // We're getting to this...
    }
    

    条件部分

    函数addItem 当前返回一个发出一些值的 Observable,为了保持一致,我们需要返回一个也发出一些值的 Observable。所以现在我们到了它所依赖的部分,它所依赖的是:addItem 发出的 Observable 返回的值应该是什么?

    有很多选择,但您可能会考虑以下一些:

    选项 1: data$ observable 无论如何都应该将信息传递给 both makeFileUploadRequests makeVideoUploadRequests,并且结果 Observable 应该发出任一运算符发出的所有值。

    在这种情况下,您将希望使用 mergeWith 函数返回一个新的 Observable:

    return mergeWith(fileUpload$, videoUpload$)
    

    选项 2: data$ observable 应根据某些过滤器将信息传递给 either makeFileUploadRequests makeVideoUploadRequests,并且结果 Observable 应该只发出由使用两者中的任何一个发出的值。

    这可能是最简单的,因为它是一个直接的三元:

    return unitId === someConditionOrWhatever ? fileUpload$ : videoUpload$
    

    选项 3: data$ observable 无论如何都应该将信息传递给 both makeFileUploadRequests makeVideoUploadRequests,并且结果 Observable 应该仅在 两个 其他 Observable 都完成之后发出。

    你会想要使用forkJoin,它会在传递给它的所有 Observables 完成时发出:

    return forkJoin(fileUpload$, videoUpload$)
    

    选项 4: data$ observable 无论如何都应该将信息传递给 both makeFileUploadRequests makeVideoUploadRequests,并且结果 Observable 应该仅在 两个 其他 Observable 都至少发射一次之后发射。

    你最好的朋友是combineLatest:

    return combineLatest([fileUpload$, videoUpload$])
    

    RxJS 中有几十个运算符专门用于合并 2+ 个 Observable,具体取决于您的操作方式和结果应该是什么,但希望这能给您一个好主意。

    TL;DR:您不会将它提供给单个管道中的 2 个不同的运算符,而是结束该管道并创建 2 个插入其中的新运算符。

    【讨论】:

    • 感谢您的回答!我尝试使用 forkjJoin,因为它看起来很合适,但问题是 addItem() 被调用了两次,在数据库上创建了我的对象的两个实例。相反,我使用选项 2 并使 videoUpload 函数也上传文件。
    猜你喜欢
    • 1970-01-01
    • 2017-04-21
    • 1970-01-01
    • 2019-07-10
    • 1970-01-01
    • 2020-07-09
    • 1970-01-01
    • 2022-01-05
    • 1970-01-01
    相关资源
    最近更新 更多