【问题标题】:NodeJS & RxJS: convert observables to nodeJs streamsNode JS 和 RxJS:将 observables 转换为 Node Js 流
【发布时间】:2020-10-16 11:13:29
【问题描述】:

我想将发送数据的 RxJs Observable 转换为 nodeJS 流,以便我可以通过 nodeJS 应用程序端点以块的形式流式传输这些数据。

这可能吗?

nodeJS 路由器中的伪代码(显然不起作用):

out.post('/data', [
    body().isArray(),
], (req: Request, res: Response, next: NextFunction) => {
    const errors = validationResult(req);
    if (!errors.isEmpty()) {
        next(new RequestValidationFailedError(errors.array()[0]));
    }

    const observable = service.getData(req.context); 
    req.pipe(observable);
});

【问题讨论】:

  • 什么是service.getData?是http调用还是别的什么?
  • 它是对不同服务器的多个 http 调用的组合,这些调用合并为一个 observable。
  • 所以返回的观察者只会发出一个值或错误。对吗?
  • 它实际上是在轮询几个端点以获取刷新的数据 :) 但是是的,它会返回一个合并的值......它将每 x 次更新一次

标签: node.js rxjs


【解决方案1】:

简单地使用观察者的nextcomplete 函数怎么样?

service.getData(req.context).subsribe({
  next: data => res.write(data),
  complete: () => res.end()
})

更新:管理断开连接的客户端

如果我们想在客户端断开连接的情况下终止 Obserable 流,如果 http.Request 发出事件 "close",我们可以取消订阅 Observable。这种情况下的逻辑如下所示

const subscription = service.getData(req.context).subsribe({
  next: data => res.write(data),
  complete: () => res.end()
})

req.on("close", function() {
  // request closed unexpectedly
  subscription.unsubscribe();
});

【讨论】:

  • 一旦第一个可用,它就不会返回响应。仅当您断开连接时,它才会返回最终响应。
  • 根据http.ServerResponsewrite 方法http.ServerResponsewrite 方法如果多次调用("第一次调用response.write(),它将将缓冲的header信息和body的第一个chunk发送给客户端,第二次调用response.write()时,Node.js假设数据会被流式传输,并单独发送新的数据,即响应被缓冲直到正文的第一块。”)。但也许我错过了什么。
  • 您可以管理删除连接取消订阅的客户端。我已更新我的答案以反映这一点。
  • 如果我理解Node documentation 没错,HttpResponse 的write 方法实际上将数据流式传输到客户端(“第一次调用 response.write() 时,它会发送缓存的头信息和body的第一个块给客户端。第二次调用response.write()时,Node.js假设数据将被流式传输,并单独发送新数据。也就是说,响应被缓冲了到正文的第一块。”)。
  • 带有 Observables 的 Websocket 效果很好。你可以look at this article,其中描述了 Websockets+Observables 的实现
猜你喜欢
  • 1970-01-01
  • 2020-09-23
  • 2013-08-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-11-05
  • 2017-12-09
  • 1970-01-01
相关资源
最近更新 更多