【问题标题】:Multiple subscriptions to Observable多个订阅 Observable
【发布时间】:2016-04-23 18:49:21
【问题描述】:

我创建了自己的Observable 并订阅了两个函数。我希望对序列中的每个元素都执行这两个函数,但只有最后一个是。

let observer = null
const notificationArrayStream = Rx.Observable.create(function (obs) {
  observer = obs;
  return () => {}
})

function trigger(something) {
  observer.next(something)
}

notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))

trigger('TEST')

预期输出

a: TEST
b: TEST

实际输出

b: TEST

这是 JSBin:http://jsbin.com/cahoyey/edit?js,console

这是为什么呢?如何让多个函数订阅一个Observable

【问题讨论】:

    标签: javascript rxjs


    【解决方案1】:

    主题

    在您的情况下,您可以简单地使用Subjectsubject 允许您在将其用作一组订阅者和一个源的代理时与多个观察者共享单个执行

    本质上,这是您使用主题的示例:

    const subject = new Subject();
    
    function trigger(something) {
        subject.next(something);
    }
    
    subject.subscribe((x) => console.log('a: ' + x));
    subject.subscribe((x) => console.log('b: ' + x));
    
    trigger('TEST');
    

    结果:

    a: TEST
    b: TEST
    

    陷阱:观察者来得太晚

    请注意,您订阅和广播数据的时间是相关的。如果您在订阅前发送广播,您将不会收到此广播的通知:

    function trigger(something) {
        subject.next(something);
    }
    
    trigger('TEST');
    
    subject.subscribe((x) => console.log('a: ' + x));
    subject.subscribe((x) => console.log('b: ' + x));
    

    结果:(空)


    重播主题和行为主题

    如果您想确保即使是未来的订阅者也能收到通知,您可以改用ReplaySubjectBehaviorSubject

    这是一个使用 ReplaySubject 的示例(缓存大小为 5,这意味着最多可以记住过去的 5 个值,而 BehaviorSubject 只能记住最后一个值):

    const subject = new ReplaySubject(5); // buffer size is 5
    
    function trigger(something) {
        subject.next(something);
    }
    
    trigger('TEST');
    
    subject.subscribe((x) => console.log('a: ' + x));
    subject.subscribe((x) => console.log('b: ' + x));
    

    结果:

    a: TEST
    b: TEST
    

    【讨论】:

    • 我真的希望基本的 RXJS 教程能够提及主题(即,在与几乎没有解释的事件发射器进行比较时,不仅仅是顺便提及)——如果没有它们,工作的基本用例首先会退化为不工作的用例并发症的迹象。看起来 RXJS 指南确实是为已经了解 RXJS 的人编写的(或者是在错误的假设下编写的,即寻找问题解决方案的开发人员会继续阅读 20 页的指南)。
    【解决方案2】:

    要让多个函数订阅一个 Observable,只需将它们订阅到那个 Observable,就这么简单。实际上这就是你所做的。

    但是您的代码不起作用,因为在执行notificationArrayStream.subscribe((x) => console.log('b: ' + x)) 之后,observer(x) => console.log('b: ' + x)),所以observer.next 会给您b: TEST

    所以基本上是你的可观察到的创造是错误的。在create 中,您传递了一个观察者作为参数,因此您可以传递它的值。您需要通过自己的逻辑以某种方式生成这些值,但正如您所看到的,您的逻辑是错误的。如果您想将值推送给观察者,我建议您使用主题。

    类似:

    const notificationArrayStream = Rx.Observable.create(function (obs) {
      mySubject.subscribe(obs);
      return () => {}
    })
    
    function trigger(something) {
      mySubject.next(something)
    }
    

    【讨论】:

    • 谢谢,主题是缺失的链接。供将来参考:jsbin.com/wifokoc/edit?js,console
    • "因为在 (...) 执行后观察者是(x) => console.log('b: ' + x))" 为什么?请您详细说明一下或提供参考吗?
    • 好的,找到参考here
    【解决方案3】:

    每次订阅时,您都会覆盖 var observer

    trigger 函数只引用了这一个 var,因此只有一个日志也就不足为奇了。

    如果我们将 var 设为数组,它会按预期工作: JS Bin

    let obs = [];
    
    let foo = Rx.Observable.create(function (observer) {
      obs.push(observer);
    });
    
    function trigger(sth){
    //   console.log('trigger fn');
      obs.forEach(ob => ob.next(sth));
    }
    
    foo.subscribe(function (x) {
      console.log(`a:${x}`);
    });
    foo.subscribe(function (y) {
      console.log(`b:${y}`);
    });
    
    trigger(1);
    trigger(2);
    trigger(3);
    trigger(4);
    

    如上所述,更简洁的解决方案是使用Subject

    【讨论】:

    • 这才是真正的答案。
    【解决方案4】:

    Observables 不是多播的;除非您使用任何类型的Subject。您当然可以创建主题,将Observable 输出通过管道传输到其他答案建议中。

    但是,如果您已经拥有Observalbe,使用share()Observable 转换为SubjectshareReplay(n) 会更方便,这相当于@987654328 @:

    import {share} from 'rxjs/operators';
    
    let observer = null
    
    const notificationArrayStream = new Observable(obs => {
      observer = obs;
    }).pipe(share());
    
    function trigger(something) {
      observer.next(something)
    }
    
    notificationArrayStream.subscribe((x) => console.log('a: ' + x))
    notificationArrayStream.subscribe((x) => console.log('b: ' + x))
    
    trigger('TEST')
    

    差不多了。

    【讨论】:

      【解决方案5】:

      您可以基于 ReplaySubject 构建包装类 Subscribable。它会比管理 Subject 和 Observable 更干净:

      export class Subscribable<T> {
      
          private valueSource: Subject = new ReplaySubject(1);
          public value: Observable;
          private _value: T;
      
          constructor() {
              this.value = this.valueSource.asObservable();
          }
      
          public set(val: T) {
              this.valueSource.next(val);
              this._value = val;
          }
      
          public get(): T {
              return this._value;
          }
      }
      

      用法:

      let arrayStream : Subscribable<TYPE> = new Subscribable<TYPE>();
      
      …
      public setArrayStream (value: TYPE) {
          this.set(value);
      }
      

      处理值变化:

      arrayStream.value.subscribe(res => { /*handle it*/ });
      

      原文:http://devinstance.net/articles/20170921/rxjs-subscribable

      【讨论】:

        【解决方案6】:

        除了使用 Subject 之外,还可以使用 publishReplay() + refCount() combo 来允许 observable 向多个订阅者多播:

        const notificationArrayStream = Rx.Observable.create(function (obs) {
          observer = obs;
          return () => {}
        }).pipe(publishReplay(), refCount())
        

        【讨论】:

          【解决方案7】:
          const subs = []
          
          const ob = new Observable((s) => {
            console.log('called')
            subs.push(s)
          })
          
          const trigger = (v) => {
            subs.forEach((sub) => {
              sub.next(v)
            })
          }
          
          ob.subscribe((v) => {
            console.log('ob1', v)
          })
          
          ob.subscribe((v) => {
            console.log('ob2', v)
          })
          
          trigger(1)
          
          

          把你的代码改成这样,它就可以工作了。这里的重点是,每个订阅都是通过其对应的订阅者更新的,如果你有多个订阅,你必须通知多个订阅者。在你的情况下,你刚刚通知了最后一个。

          【讨论】:

            猜你喜欢
            • 2017-03-02
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2019-12-29
            • 1970-01-01
            • 2016-06-27
            • 2019-12-03
            相关资源
            最近更新 更多