【问题标题】:How to buffer stream using fromWebSocket Subject如何使用 fromWebSocket Subject 缓冲流
【发布时间】:2015-11-03 09:22:18
【问题描述】:

这个RxJava buffer example(带有marble chart!)完美地描述了期望的结果:

在突发期间收集缓冲区中的项目,并在每次突发结束时发出它们,方法是使用 debounce 运算符向缓冲区运算符发出缓冲区关闭指示符

编辑:审查了How to create a RxJS buffer that groups elements in NodeJS but that does not rely on forever running interval?,我的问题似乎与使用主题而不是直接可观察对象有关。

使用套接字流生成窗口关闭事件(如下)导致打开 2 个套接字并且没有事件流出:

ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver);
var closer = ws.flatMapFirst(Rx.Observable.timer(250));
ws.buffer(closer)
    .subscribe(function(e) { console.log(e, 'socket messages');});

【问题讨论】:

  • 问题是什么?
  • 让我们从“如何在 bufferClosingSelector 中引用源 observable 本身”开始。我没有找到关于这个主题的例子。
  • 这很有帮助 - 我的问题原来是更深一层。查看修改。
  • 这应该允许区分正在发生的事情。我的直觉告诉我这里有一个热与冷的问题,所以尝试将.share()添加到ws。希望那时只创建一个套接字。然后点击 ws 查看发出的内容,例如 wsRx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).tap(*log_fn*).share()

标签: javascript reactive-programming rxjs


【解决方案1】:

在这里总结发现问题:

  • Rx.DOM.fromWebSocket 返回一个 Rx.subject,它环绕着 websocket。该主题由一个观察者和一个可观察对象组成(通过new Rx.Subject(observer, observable)。据我了解,该观察者允许通过其onNext 方法写入套接字,而可观察对象允许从套接字读取。
  • 您总是读到主题是热源,但显然在这里这只意味着观察者将立即将其值推送到主题,这里将其推送到套接字。在正常情况下(new Rx.Subject()),默认的observable和observable是这样的,observable会监听observer,因此默认的observable是hot的。然而,在这里,observable 是一个冷源,然后任何订阅都将重新执行回调,创建另一个 websocket。因此创建了两个套接字。
  • 这不会发生,例如Rx.dom.fromEvent,因为创建的(冷的)observable 是共享的(通过publish().refCount())。
  • 因此,通过在此处执行相同操作,可以解决重复问题。这意味着在这种特殊情况下,在您的代码中使用 ws = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share();share 作为 publish().refCount() 的别名。
  • 我想知道 Rx.DOM.fromWebSocket 的行为是否应该被报告为错误

两种方法的代码:

【讨论】:

  • 您能否在此处包含实际的解决方法,例如source = Rx.DOM.fromWebSocket(wsURI, null, wsOpenObserver, wsCloseObserver).share()
【解决方案2】:

您可以像 RxJava 版本一样将Observable 直接传递给buffer 运算符:

source.buffer(source.debounce(150))

有效。见here

您展示的使用选择器方法的替代语法将在每次缓冲区关闭时调用该方法,然后订阅它产生的 Observable。

RxJava 示例中的 debounce 也是发出缓冲区运算符的结果,它默认发出累积的结果。

【讨论】:

    猜你喜欢
    • 2013-05-12
    • 2013-05-12
    • 1970-01-01
    • 2014-01-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-12-06
    相关资源
    最近更新 更多