【问题标题】:Modeling WebSocket streams with RxJS使用 RxJS 对 WebSocket 流进行建模
【发布时间】:2016-11-18 13:07:30
【问题描述】:

使用 RxJS 对 WebSocket 流进行建模的方法有哪些。

我看到的明显的东西是套接字流,它发出消息流。

如果我创建一个套接字流,我如何创建它们的消息流并仍然保留发送这些消息的人?

套接字流是我的第一步:

const socket$ = Observable.create(({complete, next}) => {

  const server = new WebSocketServer({server: someHttpServer})

  server.on('connection', next)

  return () => {
    server.close()
    complete()
  }

})

但是消息流有点困难,因为我需要从中获取消息的套接字。

这是我第一次尝试建模:

const message$ = socket$.flatMap(socket => Observable.create(({complete, next}) => {

  socket.on('message', next)
  socket.on('close', complete)

  return () => socket.close()

})).share()

从所有套接字流式传输所有套接字消息的 observable。但是如果我订阅它,我就没有套接字了,这使得它是单向的。

我想要

socket$ -> message$ -> server-processing -> socket$

但响应、广播、多播和单播有多种用例。

【问题讨论】:

    标签: javascript websocket rxjs


    【解决方案1】:

    我发现flatMap 采用第二个函数,它接收flatMap 的参数值和flatMap 的(扁平化)返回值。该函数可以返回一个新值,供以后的所有运算符使用。

    const socketMessage$ = socket$.flatMap(
    
      socket => Observable.create(({complete, next}) => {
    
        socket.on('message', next)
        socket.on('close', complete)
    
        return () => socket.disconnect()
    
      }),
    
      (socket, message) => ({socket, message})
    
    ).share()
    

    【讨论】:

      【解决方案2】:

      不知道这是否是您要查找的内容(still preserve who sent these messages 是什么意思),但是有一个用于将 websocket 包装在 observables 中的库。在这里查看文档:https://github.com/Reactive-Extensions/RxJS-DOM/blob/master/doc/operators/fromwebsocket.md

      【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-07-02
      • 1970-01-01
      • 1970-01-01
      • 2017-09-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多