【问题标题】:Guarantee observable subscription finishes before proceeding to next code保证可观察的订阅在继续下一个代码之前完成
【发布时间】:2018-12-05 10:34:57
【问题描述】:

我正在使用多个服务调用进行一些级联删除。一些后来的订阅依赖于以前的订阅来完成。如何保证订阅完成,然后再转到我的下一个代码?

// Need to make sure this code completes
data.forEach(element => {
    this.myService.delete(element.id).subscribe();
});

// Before running this code
this.myService.getAll().subscribe(res => {
        res.data.forEach(element => {
            this.myService.delete(element.id).subscribe();
        });
    }
);

【问题讨论】:

  • @JBNizet 查看帖子后,我不明白如何在我的场景中实现语法。
  • 我链接到的副本中有一个示例。但无论如何...const deletions = data.map(element => this.businessEventService.delete(element.id)); forkJoin(...deletions).pipe(switchMap(() => this.myService.getAll())).subscribe(...)
  • @JBNizet 我认为我的问题首先是我应该在我的服务范围内完成这项工作。甚至可以在组件中使用两个单独的服务调用来执行此操作,其中每个服务发出一个 http 请求并返回一个 observable?

标签: angular typescript rxjs observable


【解决方案1】:

Subscription 有一个独特的目的:处置,但你有选择:

  • 如果要一个个订阅 observables,可以使用concat

  • 如果你想同时订阅多个 observables 并合并每个 observables 的最后一个值,你可以使用forkJoin

  • 如果你想在另一个 observable 中使用 observable 的 yield 值,你可以使用 flatMap


import { forkJoin, interval, concat, of } from "rxjs";
import { first, flatMap } from "rxjs/operators";

var combinedIntervals =
    forkJoin(
        interval(1000).pipe(first()),
        interval(2500).pipe(first())
    ).pipe(
        flatMap(([a, b]) => of(`${a} and ${b}`))
    );

concat(
    combinedIntervals,
    of("after both intervals")
)
.subscribe(
    console.log.bind(console)
);


// 0 and 0
// after both intervals

对于您的具体情况,您可以选择删除操作作为可观察对象,然后 forkJoin。

var data = [];

var obsBatch1 = data.map(element => myService.delete(element.id));
var obsBatch2 =
    forkJoin(
        obsBatch1,
        elements => elements.map(
            element => myService.delete(element.id)
        )
    );

obsBatch2.subscribe();

这是rxjs@6 语法。我将rxjs@5 留作练习。

【讨论】:

  • 在我需要调用 myService.GetProducts() 的情况下,循环并在每个上调用 myService.DeleteProduct()。在继续执行其他订阅的代码之前,如何保证所有单独的删除都已完成?
  • 使用 forkJoin 函数,如副本、我的评论和我给你的链接中所述。
  • 将要删除的操作映射为 observables 和 forkJoin 它们。 `forkJoin(items.map(<delete as observable))。或者您可以更改您的 api 以接受多次删除。这将是最有效的方法。
  • @BlakeRivell forkJoin = 一个可观察对象,它从所有可观察对象中产生最后一个值。所以是的,所有的 observables 必须在forkJoin 产生之前完成,而forkJoin 只产生一次。至于forkJoin 是否会在一个或多个可观察对象完成时产生但没有产生任何值,这取决于您使用的 Rx 版本。用它来找出答案。
  • @BlakeRivell Observable 可以被认为是Task/Promise,它可以产生多个结果。同样,Task/Promise 只是一个返回单个结果的Observable。在 C# 中,您甚至可以 var one = await Observable.Return(1); (如果它产生除单个值之外的任何内容,它会抛出)。 forkJoin/CombineLatest 只是 Task.WhenAll(...).Select(x => x.Result);,而 concat/Concat 只是 `foreach (var task in tasks) { await task; /* 做一点事 */ }。除了 observable 可以产生多个结果,因此需要考虑。但如果你需要,你会想办法的。
猜你喜欢
  • 1970-01-01
  • 2021-12-26
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-03-24
相关资源
最近更新 更多