【问题标题】:Sharing cold and hot observables共享冷热观测值
【发布时间】:2015-11-14 21:16:27
【问题描述】:

我对使用Rx.Observable.just 创建的shared 流的行为感到困惑。

例如:

var log = function(x) { console.log(x); };

var cold = Rx.Observable
  .just({ foo: 'cold' });

cold.subscribe(log); // <-- Logs three times
cold.subscribe(log);
cold.subscribe(log);

var coldShare = Rx.Observable
  .just({ foo: 'cold share' })
  .share();

coldShare.subscribe(log); // <-- Only logs once
coldShare.subscribe(log);
coldShare.subscribe(log);

两个流只发出一个事件,但未共享的一个可以订阅 3 次。这是为什么呢?

我需要“分叉”一个流但共享它的值(然后合并分叉的流)。

如何在分享流的价值的同时多次订阅它?

我意识到这可能与“冷”和“热”可观察对象的概念有关。然而:

  • Rx.Observable.just() 创建的流是冷的还是热的?
  • 应该如何确定上一个问题的答案?

【问题讨论】:

    标签: reactive-programming rxjs


    【解决方案1】:

    Rx.Observable.just() 创建的流是冷的还是热的?

    冷。

    应该如何确定上一个问题的答案?

    我猜文档是唯一的指南。

    如何在共享流的价值的同时多次订阅它?

    您正在寻找a connectable observable 的想法。举例:

    var log = function(x) { console.log(x); };
    var coldShare = Rx.Observable
      .just({ foo: 'cold share' })
      .publish();
    
    coldShare.subscribe(log); // Does nothing
    coldShare.subscribe(log); // Does nothing
    coldShare.subscribe(log); // Does nothing
    
    coldShare.connect(); // Emits one value to its three subscribers (logs three times)
    

    var log = function(x) {
      document.write(JSON.stringify(x));
      document.write("<br>");
    };
    
    var coldShare = Rx.Observable
      .just({ foo: 'cold share' })
      .publish();
    
    coldShare.subscribe(log); // <-- Only logs once
    coldShare.subscribe(log);
    coldShare.subscribe(log);
    
    coldShare.connect();
    &lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.min.js"&gt;&lt;/script&gt;

    上面的例子记录了 3 次。使用publishconnect,您实际上“暂停”了可观察对象,直到调用connect

    另见:

    【讨论】:

    • 所以,事实上我已经知道publish()connect()。但是,让我感到困惑的是,如果coldShare 是:Rx.Observable.interval().take(1).share(),那么它确实 记录了3 次,并且您不需要使用publish()connect()。这是为什么呢?
    • @PaulMurray sharepublishconnect 的特例
    • 那么,Rx.Observable.interval().take(1) 很热,Rx.Observable.just() 很冷,这是真的吗?
    • 是的,Rx.Observable.interval().take(1) 很热,因为 Rx.Observable.interval() 很热(而且因为 .take(1) 不会从热变为冷)
    • @PaulMurray 除非另有说明,否则您可能会认为 observable 是冷的。热与冷可以被认为是主动与被动(分别),大多数可观察对象是被动的,因为它们在有人订阅之前不做任何事情。 hot observables 的例子(无论是否有人订阅都会发出值)包括鼠标事件、外部 API 更新等。
    【解决方案2】:

    我不明白你的第一个问题,但关于最后一个问题,因为我也遇到了问题:

    • Observables/Observers 的 Rxjs 实现是基于观察者模式的,类似于老式的回调机制。
    • 举例来说,这里是创建 observable 的基本形式(取自https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/create.md 的文档)

      var source = Rx.Observable.create(function (observer) {
          observer.onNext(42);
          observer.onCompleted();
      
          // Note that this is optional, you do not have to return this if you require no cleanup
          return function () {
              console.log('disposed');
          };
      });
      
    • Rx.Observable.create 将一个函数(比如 factory_fn 是原始的)作为参数,该函数接受一个观察者。您的值是由您在factory_fn 正文中选择的计算生成的,并且因为您在参数中有观察者,您可以在您认为合适时处理/推送生成的值。但是factory_fn 没有被执行,它只是被注册(就像回调一样)。每当相关的 observable 上有一个 subscribe(observer) 时都会调用它(即 Rx.Observable.create(factory_fn) 返回的那个。

    • 一旦订阅完成(调用创建回调),值会根据工厂函数中的逻辑流向您的观察者,并保持这种状态,直到您的可观察对象完成或观察者取消订阅(假设您确实执行了取消值流的操作作为factory_fn的返回值)。
    • 这基本上意味着默认情况下,Rx.Observables 是冷的。
    • 在使用了相当多的库后,我得出的结论是,除非有适当的文档记录,否则要确定可观察对象的温度的唯一方法是查看源代码。或者在某处添加一个副作用,订阅两次,看看副作用是发生了两次还是只发生了一次(这就是你所做的)。那个,或者在stackoverflow上问。
    • 例如,Rx.fromEvent 产生 hot observables,从代码的最后一行 (return new EventObservable(element, eventName, selector).publish().refCount();) 可以看出。 (此处代码:https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/fromevent.js)。 publish 运算符是那些将冷可观察到热的运算符之一。它的工作原理超出了范围,所以我不会在这里详细说明。
    • Rx.DOM.fromWebSocket 不会产生热可观察对象 (https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/src/dom/websocket.js)。参照。 How to buffer stream using fromWebSocket Subject
    • 我认为混淆经常来自于我们将实际来源(比如按钮点击流)和它的表示(Rx.Observable)混为一谈。不幸的是,当这种情况发生时,我们想象中的热源最终可能会被冷Rx.Observable代表。

    所以,是的,Rx.Observable.just 创建冷可观察对象。

    【讨论】:

      猜你喜欢
      • 2011-02-01
      • 1970-01-01
      • 2017-09-26
      • 1970-01-01
      • 1970-01-01
      • 2015-12-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多