【问题标题】:How to unsubscribe from an Observable returned by forkJoin?如何取消订阅 forkJoin 返回的 Observable?
【发布时间】:2024-01-10 17:49:01
【问题描述】:

在我的 Angular2-typescript 应用程序中,我使用 forkJoin 仅在进行所有并行 HTTP 调用后才返回 Observable。

问题:订阅回调无限期地执行

这是我的代码:

http.service

import {Http} from "@angular/http";

constructor (private _http: HTTP) {}

makeGetRequest(....) {
    return this._http.get(URL)
           .map (res => res.json)
           .toPromise();

my.service

import {Observable} from "rxjs/Observable";
import {HttpService} from "http.service"

constructor (private _httpService: HttpService) {}

myMethod(): Observable<any[]> {
 return Observable.forkJoin(
            this._httpService.makeGetRequest(
                URL1
            ),
            this._httpService.makeGetRequest(
                URL2
            )
        )
}

my.component

import MyService from "my.service";
import Subscription from "rxjs";

constructor (private _service: MyService) {}

mySub: Subscription;

ngOnInit() {
    this.mySub = this._service.myMethod.subscribe(data => {
         data.forEach(console.log(data));
         this.mySub.unsubscribe();
     }
}

我尝试了什么(同样的问题):

  • 在 Http.service 中返回 Observable 而不是 Promise
  • 在 my.component 中使用 .first().subscribe() 而不是 subscribe()
  • 把这个.mySub.unsubscribe();在 ngOnInit 结束时而不是在订阅回调内部(也使用 setTimeout(() => ....))

【问题讨论】:

  • 订阅回调无限期执行是什么意思?
  • 这个问题没有意义。 forkJoin 加入来自 completed 可观察对象的结果,不应取消订阅,因为它只有一个值。如果您在无限期执行方面遇到问题,请提供一个MCVE 可以复制您的问题
  • @estus 你是对的。显然发生这种情况是因为订阅被实例化了两次,这导致了问题。随意回答问题,我会接受的;)
  • 当然。好在很容易解决。

标签: angular rxjs fork-join


【解决方案1】:

正如forkJoin reference 所说,它

并行运行所有可观察序列并收集它们的最后一个元素。

这意味着操作符从已完成的可观察对象中获取值并返回具有单个值的已完成可观察对象。无需退订。

【讨论】:

  • 如果我们想要两个 forkjoins 怎么办?我遇到了麻烦,因为我在我的应用程序中多次调用 .forkjoin() - 我认为 rxjs 不喜欢这样:(
  • 你到底是什么意思?您可以使用多个 forkJoin。它们只是返回可以以任何方式组合的 observables。只要您对他们的期望是正确的,就不应该有任何问题。如果您有特定案例,请考虑提出一个新问题来解释它并包含*.com/help/mcve
  • 在一个组件中,我做了一个 forkJoin 以获得某些结果。我将其中一些结果推送到一个数组中,然后使用该数组执行 forkJoin。但是即使没有发生错误,第二个 forkJoin 也不会返回数据。如果我取出最初的 forkJoin,那么它工作正常。我们的 forkjoin 实际上是一个 observable 本身,我想当我第二次调用它时,它仍然已经设置为原始值?
  • 经过进一步测试,似乎是时间问题。我的第二个 forkjoin 只要我在它周围放置一个 setTimeout 并在 3 秒时停止它就会运行(还没有尝试更多/更少的时间)。这让我感到惊讶,因为由于我的代码,我假设我的第一个 forkjoin 在第二个触发之前完成..
【解决方案2】:

您可以取消它。假设observables 是您准备触发的一组http 请求(通过httpClient 执行)。

this.forkJoinSubscription = forkJoin(observables).subscribe(responses => {
    . . . do something
});

this.forkJoinSubscription.unsubscribe();

您可能会在您的网络标签中注意到这些请求已被取消。

【讨论】:

    最近更新 更多