【问题标题】:RxJS: How to have one Observer process multiple Observables?RxJS:如何让一个 Observer 处理多个 Observable?
【发布时间】:2015-12-14 20:10:20
【问题描述】:

我正在使用一个调用我实现的函数的框架。我希望将此函数的参数转换为 Observable,并通过一系列观察者发送。我以为我可以为此使用主题,但它的行为与我预期的不同。

为了澄清,我有类似以下代码的内容。我认为下面的 Option 1 会起作用,但到目前为止我还是选择了 Option 2,这似乎一点也不习惯。

var eventSubject = new Rx.Subject();
var resultSource = eventSubject.map(processEvent);
var subscription = resultSource.subscribe(
    function(event) {
        console.log("got event", event);
    },
    function(e) {
        log.error(e);
    },
    function() {
        console.log('eventSubject onCompleted');
    }
);

// The framework calls this method
function onEvent(eventArray) {

   var eventSource = Rx.Observable.from(eventArray);

   // Option 1: I thought this would work, but it doesn't
   // eventSource.subscribe(eventSubject);

   // Option 2: This does work, but its obviously clunky
  eventSource.subscribe(
      function(event) {
          log.debug("sending to subject");
          eventSubject.onNext(event);
      },
      function(e) {
          log.error(e);
      },
      function() {
          console.log('eventSource onCompleted');
      }
  );
}

【问题讨论】:

  • onEvent handler 是你自己注册的吗?
  • 无论如何,我能想到的使用Subject.create(observer, observable) 并不会真正导致不那么笨重的东西,因为您将通过的观察者将与您传递给@987654328 的观察者完全相同@,那我们看看其他人的提议吧。
  • @user3743222 - onEvent 是我编写的一个函数,我的框架(环回)知道该函数存在并从它自己的代码中调用它。我不能使用 fromEvent() 或 fromEventPattern() 方法,因为它们与框架注册处理程序的方法不匹配。
  • 好的,我的想法是编写你自己的fromLoopbackEvent,但这不太可能导致更少的代码。您可以查看Rx.DOM.fromWebSocket 的实现,以获取使用Subject.create(observer, observable) 表单的示例。或文档:github.com/Reactive-Extensions/RxJS/blob/master/doc/api/…。关键是您需要指定观察者,可能没有默认观察者。所以按照你的方式去做,或者其他方式是一样的,但让我们看看。
  • Rx.Observable.prototype.subscribe 需要一个观察者或三个函数,并且主体不是观察者。因此,您也可以尝试使用eventSource.subscribe(eventSubject.asObserver()) 将主题转换为适当的观察者。参见github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/…。默认行为是一个onNext 方法,它发送到链接到主题的可观察对象。

标签: javascript reactive-programming rxjs


【解决方案1】:

正如 Brandon 已经解释的那样,将 eventSubject 订阅到另一个 observable 意味着将 eventSubjects onNext、onError 和 onComplete 订阅到 observables onNext、onError 和 onComplete。从您的示例来看,您似乎只想订阅 onNext。

一旦第一个 eventSource 完成/错误,您的主题就完成/错误 - 您的 eventSubject 正确地忽略了后续 eventSoures 对其触发的任何进一步的 onNext/onError。

有多种方法可以只订阅任何 eventSource 的 onNext:

  1. 仅手动订阅 onNext。

    resultSource = eventSubject
      .map(processEvent);
    
    eventSource.subscribe(
        function(event) {
            eventSubject.onNext(event);
        },
        function(error) {
            // don't subscribe to onError
        },
        function() {
            // don't subscribe to onComplete
        }
    );
    
  2. 使用只为您处理订阅事件源 onNext/onError 的操作符。这是布兰登的建议。 请记住,这也订阅了 eventSources onError,在您的示例中您似乎并不想要它。

    resultSource = eventSubject
      .mergeAll()
      .map(processEvent);
    
    eventSubject.onNext(eventSource);
    
  3. 使用不为 eventSources onError/onComplete 调用 eventSubjects onError/onComplete 的观察者。您可以简单地将 eventSubjects onComplete 覆盖为肮脏的 hack,但最好创建一个新的观察者。

    resultSource = eventSubject
      .map(processEvent);
    
    var eventObserver = Rx.Observer.create(
      function (event) {
        eventSubject.onNext(event);
      }
    );
    
    eventSubject.subscribe(eventObserver);
    

【讨论】:

    【解决方案2】:

    只是当您将整个 Subject 订阅到一个 observable 时,您最终会将该 observable 的 onComplete 事件订阅到该主题。这意味着当 observable 完成时,它将完成您的主题。所以当你得到下一组事件时,它们什么都不做,因为主题已经完成了。

    一个解决方案正是您所做的。只需将onNext 事件订阅到主题即可。

    第二种解决方案,可以说更像“rx like”,是将传入的数据视为可观察的可观察数据,并使用mergeAll 展平这个可观察的流:

    var eventSubject = new Rx.Subject(); // observes observable sequences
    var resultSource = eventSubject
      .mergeAll() // subscribe to each inner observable as it arrives
      .map(processEvent);
    var subscription = resultSource.subscribe(
        function(event) {
            console.log("got event", event);
        },
        function(e) {
            log.error(e);
        },
        function() {
            console.log('eventSubject onCompleted');
        }
    );
    
    // The framework calls this method
    function onEvent(eventArray) {
    
       var eventSource = Rx.Observable.from(eventArray);
       // send a new inner observable sequence
       // into the subject
       eventSubject.onNext(eventSource);
    }
    

    更新:Here is a running example

    【讨论】:

    • 愚蠢的问题:是否可以将 onNext、onError、onComplete 组合变成一个对象(可能是一个观察者)?这样整个事情就可以移动到另一个文件以实现模块化?我想做类似resultSource.subscribe(mySink)(更流畅)的事情,现在我必须做mySink(resultSource),mySink 在内部进行订阅
    • .merge() 的这种用法似乎与文档不匹配:github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/…
    • 你的意思是.mergeAll()?请参阅文档:github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/…
    • 是的,我猜我链接到的合并不支持不提供 maxConcurrent 参数。
    • 对不起,我不能让它工作。我很快就会尝试创建一个 jsbin
    猜你喜欢
    • 2016-07-06
    • 1970-01-01
    • 1970-01-01
    • 2020-02-05
    • 1970-01-01
    • 1970-01-01
    • 2019-01-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多