【问题标题】:forkJoin doesn't work with AngularFire2 valueChangesforkJoin 不适用于 AngularFire2 valueChanges
【发布时间】:2025-12-04 06:55:02
【问题描述】:

请帮助我解决我正在苦苦挣扎的问题。

我有一组 Firebase 对象键

const keys = ['-Kx9pqoMWlJLbKLQcAkP', '-Kx9pqoOYlDHTJ64Was5']

我要做的是使用forkJoin 将所有这些 Firebase 对象放在一个流中。这是我所拥有的:

const obj1 = this.fbService.getObj(keys[0]);
const obj2 = this.fbService.getObj(keys[1]);

forkJoin([obj1, obj2])
    .subscribe(res => {
        console.log(res);  // <-- this never happens
    };

fbService 方法是:

getObj(key): Observable<MyObj> {
  return this.fb.object(`/path/to/obj/${key}`).valueChanges();
}

我假设这个getObj 方法不适用于forkJoin,可能是因为valueChanges,我是否正确使用它?

但是:

  • getObj 可以很好地获取单个 Firebase 对象,例如:

    this.fbService.getObj(keys[0])
        .subsribe(res => console.log(res))// <-- works
    
  • forkJoin 可以很好地处理简单的 HTTP 请求,例如

    const r1 = this.http.get('https://swapi.co/api/people/1');
    forkJoin([r1])
        .subscribe(res => {
            console.log(res);  // <-- works
        };
    

那么,我做错了什么? 我的目标是从键数组中获取对象数组:

['-Kx9pqoMWlJLbKLQcAkP', '-Kx9pqoOYlDHTJ64Was5'] => [{prop:'val'},{prop:'val2'}]

【问题讨论】:

  • 你试过this.fbService.getObj(keys[1])也可以吗?因为如果forkJoin 中的多个可观察对象中的任何一个失败,则整个流都会失败。
  • 是的,我确实尝试过。 this.fbService.getObj(keys[1]) 有效,但是当它在 forkJoin 内部时,即使是单独的,它也不起作用

标签: angular firebase rxjs angularfire2 rxjs5


【解决方案1】:

forkJoin 运算符要求所有源 Observable 至少发出一项完成

我对 firebase 了解不多,但我怀疑 valueChanges 永远不会完成,这就是为什么 forkJoin 永远不会发出任何东西。解决此问题的一种方法是使用take(1) 始终完成链。

forkJoin(obj1.take(1), obj2.take(1)).subscribe(res => console.log(res);

也许在您的情况下,最好使用 zip() 运算符,它只要求所有源 Observables 发出相同数量的项目。但是请确保您取消订阅它,因为它在源 Observables 完成之前不会自行完成。

【讨论】:

  • 我刚刚查看了文档。是的,它们永远不会完成。我猜 OP 或者必须使用 .combineLatest()
  • 这是一个很好的观点,伙计们。但是,当我单独使用它时,为什么 valueChanges 会起作用? this.fb.object(...).valueChanges().subscribe(WORKS)
  • 因为第一个参数是为每个项目调用的next 处理程序。如果你想检查 Observable 是否完成,你可以使用例如。这个:.subscribe(undefined, undefined, () =&gt; console.log('complete'))
  • .combineLatest() 在这种情况下会很好,但请注意它会在任何源 Observable 的每次发射中发射,因此您可能会重复获得相同的值。
  • @Matiishyn 这意味着流永远不会完成。当您订阅 observables 时,您只是对流“做出反应”。如果您想知道流是否完成,可以将代码放入 complete() 处理程序。你意识到他们永远不会被处决。
【解决方案2】:

@martin 已提供正确答案,但代码示例需要更新为新的 RxJS 语法:

forkJoin(obj1.pipe(take(1)), obj2.pipe(take(1))).subscribe(res => console.log(res));

【讨论】:

    【解决方案3】:

    尝试使用startWith运算符

    getObj(key): Observable<MyObj> {
      return this.fb.object(`/path/to/obj/${key}`)
         .valueChanges.pipe(starWith(value)); // value can be default obj to trigger stream
    }
    

    可能是因为valueChanges 在您手动触发流之前不会开始发射,所以forkJoin 在所有源 Observable 至少发射一次之前不会触发;所以我们让它们都立即发出一次它们的默认值。

    【讨论】: