【问题标题】:How can i execute asynchronous code when an RxJS observable complete?当 RxJS observable 完成时,我如何执行异步代码?
【发布时间】:2021-04-13 12:40:51
【问题描述】:

我想在 observable 完成时执行代码。在我的代码中,我执行这个:

  compact(): Observable<FileManifest> {
    return this.loadIndex().pipe(
      mergeMap((index) => index.walk()),
      map((entry) => entry.manifest),
      notUndefined(),
      writeAllMessages(this.newPath, ProtoFileManifest),
      finalize(async () => {
        await Promise.all([
          promises.rm(this.journalPath, { force: true }),
          promises.rm(this.manifestPath, { force: true }),
        ]);
        await promises.rename(this.newPath, this.manifestPath);
      }),
    );
  }

问题在于 finalize 方法是为同步代码创建的。当我像上面那样执行异步代码时,代码将独立于订阅执行。

我希望在处理可观察资源时执行此操作,但我希望当我订阅时,我总是收到事件。

如何将异步代码放入 finalize 方法中?

谢谢 乌尔里希

【问题讨论】:

  • 我建议将所有 Promise 转换为 observables。 mergeMap 也接受承诺,因此您可以将在 finalize 中使用的承诺放入 mergeMap
  • 在完成另一个 observable 后运行 observable 将是 concat* 的工作。

标签: rxjs rxjs6


【解决方案1】:

一种方法是创建三个可观察对象,而不是尝试全部完成 在一个。每个都将构成您想要的顺序异步链中的一个链接 制作。

为了使基于 Promise 的可观察对象中的副作用变得惰性,我们使用 defer。 请注意,延迟回调的返回值可以是可观察的,也可以是 “ObservableInput”,这是 RxJS 所称的值,它知道如何转换 变成可观察的。这个值可以是(除其他外)一个承诺。

({
  compact(): Observable<FileManifest> {
    const writeToTempManifest$ = this.loadIndex().pipe(
      mergeMap((index) => index.walk()),
      map((entry) => entry.manifest),
      notUndefined(),
      writeAllMessages(this.newPath, ProtoFileManifest)
    );

    const removeOldManifest$ = defer(() =>
      Promise.all([
        promises.rm(this.journalPath, { force: true }),
        promises.rm(this.manifestPath, { force: true }),
      ])
    );

    const renameNewManifest$ = defer(() =>
      promises.rename(this.newPath, this.manifestPath)
    );

    return from([
      writeToTempManifest$,
      removeOldManifest$,
      renameNewManifest$,
    ]).pipe(concatAll());
  },
});

请注意,这些可观察对象中的每一个都可能发出一些东西(尽管我不熟悉 API)。第一个发出 writeAllMessages 运算符所做的任何事情,而第二个和第三个发出各自承诺的解析值。对于第二个,这是来自Promise.all 的双元素数组。

如果你想抑制 observable 发出的值,同时在它完成之前保持打开状态,你可以创建一个操作符来做这件事:

const silence = pipe(concatMapTo(EMPTY));

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-09
    • 2015-08-08
    • 1970-01-01
    相关资源
    最近更新 更多