【问题标题】:Get return value from subcribe on Observable从 Observable 上的订阅获取返回值
【发布时间】:2016-10-23 01:41:46
【问题描述】:

使用 RxJS 5.0.0-rc.1,我试图通过使用 yield.next() 交换数据,以类似于 how generators/iterators work 的方式传达我的 ObserverObservable。其目的是掌握对.subscribe 的调用返回的内容,并据此修改/更新我的可观察流中的以下值。

我不完全确定这是否可能。不过,我发现您可以捕获.subscribe 回调中抛出的异常。下面的 sn-ps 打印出"Boom!":

var source = Observable.create((observer) => {
  try {
    observer.next(42);
  } catch (e) {
    // This will catch the Error
    // thrown on the subscriber
    console.log(e.message);
  }
  observer.complete();
});

source.subscribe(() => {
  throw new Error('Boom!');
});

那么,如果订阅者不是抛出,而是返回一个值呢? Observable 有没有办法检索它?也许我以错误的方式接近这个。如果是这样,在这种情况下,什么是“被动”的做事方式?

非常感谢。


编辑

我想出的一种可能方法是为流中的每个项目提供一个回调函数。比如:

var source = Observable.create((observer) => {
  // This will print "{ success: true }"
  observer.next({ value: 42, reply: console.log });
  observer.complete();
});

source.subscribe(({ value, reply }) => {
  console.log('Got', value);
  return reply({ success: true });
});

还有其他想法吗?


编辑 2

由于我最初的问题对我想要实现的目标带来了一些困惑,我将描述我的真实世界场景。我正在编写一个模块的 API,用于通过队列管理消息(很像简化的内存中 AMQP-RPC 机制),我认为 RxJS 会很合适。

它的工作方式与您预期的一样:Publisher 将消息推送到队列,然后将其传递到 Consumer。在术语中,Consumer 可以回复Publisher,如果有兴趣可以收听该回复。

在理想情况下,API 应如下所示:

Consumer().consume('some.pattern')
  .subscribe(function(msg) {
    // Do something with `msg`
    console.log(msg.foo);
    return { ok: true };
  });

Publisher().publish('some.pattern', { foo: 42 })
// (optional) `.subscribe()` to get reply from Consumer

该示例将打印42

回复Publisher 的逻辑位于Consumer 函数中。但实际响应来自 .subscribe() 回调。这引出了我最初的问题:我应该如何从流的创建者那里获取返回值?

Consumer#consume()想象成:

/**
 * Returns an async handler that gets invoked every time
 * a new message matching the pattern of this consumer
 * arrives.
 */
function waitOnMessage(observer) {
  return function(msg) {
    observer.next(msg);
    // Conceptually, I'd like the returned
    // object from `.subscribe()` to be available
    // in this scope, somehow.
    // That would allow me to go like: 
    // `sendToQueue(pubQueue, response);`
  }
}

return Observable.create((observer) => {
  queue.consume(waitOnMessage(observer));
});

还有意义吗?

【问题讨论】:

    标签: javascript reactive-programming rxjs observer-pattern rxjs5


    【解决方案1】:

    生成器和可观察对象之间确实有相似之处。如您所见here,observables(值的异步序列)是 iterables(值的同步序列)的异步版本。

    现在,生成器是一个返回Iterable 的函数。然而,Rxjs Observable 包含了一个生成器——也就是一个生产者(你通过调用 subscribe 来执行/启动)和生成的异步值序列(你通过传递一个 Observer 对象来观察)。 subscribe 调用返回一个Disposable,它允许您停止接收值(断开连接)。因此,虽然生成器和可观察对象是双重概念,但使用它们的 API 不同。

    默认情况下,您无法使用 rxjs 可观察 API 进行双向通信。您可能可以通过主题构建自己的反向通道来做到这一点(请注意,您必须有一个初始值才能启动循环)。

    var backChannel = Rx.Subject();
    backChannel.startWith(initialValue).concatMap(generateValue)
      .subscribe(function observer(value){
      // Do whatever
      // pass a value through the backChannel
      backChannel.next(someValue)
    })
    // generateValue is a function which takes a value from the back channel 
    // and returns a promise with the next value to be consumed by the observer.
    

    你可以考虑用 :

    function twoWayObsFactory (yield, initialValue) {
      var backChannel = Rx.BehaviorSubject(initialValue);
      var next = backChannel.next.bind(backChannel);
      return {
        subscribe : function (observer) {
          var disposable = backChannel.concatMap(yield)
            .subscribe(function(x) {
               observer(next, x);
            });
          return {
            dispose : function (){disposable.dispose(); backChannel.dispose();}
          }
        }
      }
    }
    
    // Note that the observer is now taking an additional parameter in its signature
    // for instance
    // observer = function (next, yieldedValue) {
    //              doSomething(yieldedValue);
    //              next(anotherValue);
    //            }
    // Note also that `next` is synchronous, as such you should avoir sequences
    // of back-and-forth communication that is too long. If your `yield` function
    // would be synchronous, you might run into stack overflow errors.
    // All the same, the `next` function call should be the last line, so order of
    // execution in your program is the same independently of the synchronicity of
    // the `yield` function
    

    否则,您描述的行为似乎是异步生成器的行为。我从来没有使用过这样的,但由于这是对某些未来版本的 javascript 的提议,我认为你可以 已经开始使用 Babel 进行尝试(参见https://github.com/tc39/proposal-async-iteration)。

    编辑

    如果您正在寻找一种环回机制(不太通用的方法,但非常适合您的用例,如果您想做的事情足够简单),expand 运算符可以提供帮助。要了解它的行为,请查看doc,以及关于 SO 的以下答案,以获取具体上下文中的使用示例:

    基本上expand 允许您在下游发出一个值并同时将该值反馈到您的生产者中。

    【讨论】:

    • 我喜欢你的想法。请给我看一个它的用法示例好吗?也许,使用您的twoWayObsFactory 重新实现我对原始问题的简单示例代码?
    • 我只是在想,你检查过expand 运算符(Rxjs v4)吗?它允许你有一个反馈循环。顺便说一句,您的简单代码并不是您描述的问题的一个很好的例子。对于我所看到的,在该示例中,您没有将任何值发送回您的 observable。
    • 正在发送一个值。实际上是一个完整的对象:{ success: true }。 observable 除了打印它之外什么都不做 - 但是,将 reply 更改为您想要的任何内容,您可以以任何方式操纵响应。我说的是我的第二个例子,在 EDIT 下。
    • 我认为您对术语感到困惑。 reply 是一个回调,即一个函数,您不会将任何值从观察者发送到可观察对象(在您的示例中为 source)。这与生成器的nextyield API 没有任何关系。 next 确实将一个值传递给生成器,该生成器可以使用或不使用它来生成下一个值。因此,您的示例中没有双向数据流。如果您只想这样做,请不要订阅,只需映射,obs.map(value => doSomething).map(value => doSomethingElse).subscribe(function(){doNothing})
    • 好吧,那么您应该为后台频道使用一个主题并将您的发布者订阅到该主题。您的发布者既是制作者又是听众,而这正是主题的本质,因此您的发布者也应该是主题。也看看这里:github.com/Reactive-Extensions/RxJS/blob/master/doc/howdoi/…。不能再花太多时间在这个抱歉了。
    猜你喜欢
    • 1970-01-01
    • 2017-02-17
    • 2019-03-09
    • 1970-01-01
    • 2017-03-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多