【问题标题】:How to combine Subjects and Observables with Angular rxjs如何将 Subjects 和 Observables 与 Angular rxjs 结合起来
【发布时间】:2019-06-19 14:18:31
【问题描述】:

我有以下两个订阅:

this.service1.source1.subscribe(countries => {
  this.filteredData =  [];
  countries.forEach(country => {
    this.filteredData = this.data.filter(user => {
      return user.country == country;
    });
  });
});

this.service2.source2.subscribe(companies => {
  companies.forEach(company => {
    this.filteredData =  [];
    this.filteredData = this.data.filter(user => {
      return user.company == company;
    });
  });
});

其中 source1 和 source2 是 BehaviorSubject 并且两个事件都被 subscribe 捕获。

我想合并两个订阅,因为我想根据subscribe 返回的两个结果过滤数据

我试过forkJoinzip

zip([this.service1.source1, this.service1.source2]).subscribe(results => {
  console.log('zip results:', results);
});

forkJoin([this.service1.source1, this.service1.source2]).subscribe(results => {
  console.log('forkJoin results:', results);
});

但是对于那些(zip 和 forkJoin),我注意到我什至没有在控制台上得到任何东西,好像subscribe 在发出事件时没有被执行。

如何将两个来源的结果合并到一个订阅中?

【问题讨论】:

  • 您是否尝试过从forkJoin 中删除数组?它接受可变参数来源,但我不确定这是否会有所不同。

标签: angular rxjs observable behaviorsubject subject


【解决方案1】:

forkJoin 可能不适合您的用例,因为它在所有可观察对象都完成时发出每个最后发出的值。

zip 也可能不会给你想要的行为,因为 它会等待所有输入流都发出它们的第 n 个值,一旦满足这个条件,它就会组合所有这些第 n 个值并发出第 n 个组合值。

所以在任何一种情况下,在两个可观察对象中都有发射之前,您都不会得到发射。因为this.service1.source1this.service1.source2BehaviorSubject,使用zip 保证初始发射。但只有在两个 observables 都发射时才会发生以后的发射。

我建议使用combineLatest,因为每当任何输入流发出一个值时,它都会结合每个输入流发出的最新值。

combineLatest(this.service1.source1, this.service1.source2).subscribe(results => {
  console.log('combineLatest results:', results);
});

因为this.service1.source1this.service1.source2BehaviorSubject,所以:

Subject 的一种变体,它需要一个初始值并在订阅时发出其当前值。

保证在您订阅它以及任何可观察对象发出一个值时,您都会收到一个发射。

【讨论】:

    【解决方案2】:

    正如@ysf 上面指出的那样,如果您想在任一发射时过滤具有两个结果的本地数据,您应该选择combineLatest

    此外,我建议您在每个服务中创建一个变量作为 BehaviorSubject 的 Observable 实例。喜欢:

    // at Service1.service.ts
    export class Service1Service {
      // Do not expose Subject directly.
      private source1 = new BehaviorSubject<YourDataType[]>(YourInitialValue);
      private source2 = new BehaviorSubject<YourDataType[]>(YourInitialValue);
    
      source1$: Observable<YourDataType[]>;
      source2$: Observable<YourDataType[]>;
    
      constructor() {
        // Create an Observable instance for your sake and open it to the public.
        this.source1$ = this.source1.asObservable();
        this.source2$ = this.source2.asObservable();
      }
    }
    
    
    // at wherever you subscribe
    combineLatest(this.service1.source1$, this.service1.source2$).pipe(
      filter(([res1, res2]: YourDataType[]) => {
        // Filter your data with res1, res2
      })
    ).subscribe();
    

    【讨论】:

      【解决方案3】:

      语法错误。 forkJoinzip 都接受 Observables 作为参数。不是 Observable 的数组。从通话中删除[],你应该没问题。

      试试这个:

      zip(this.service1.source1, this.service1.source2).subscribe(results => {
        console.log('zip results:', results);
      });
      
      forkJoin(this.service1.source1, this.service1.source2).subscribe(results => {
        console.log('forkJoin results:', results);
      });
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-12-25
        相关资源
        最近更新 更多