【问题标题】:Avoid re-fetching data while streaming in Expressjs避免在 Expressjs 中流式传输时重新获取数据
【发布时间】:2020-10-02 02:38:24
【问题描述】:

我刚刚开始在 Expressjs 中处理流数据。

不完全确定,但我认为请求将再次开始执行处理程序。例如,这是我的处理程序:

import getDataAsync from "./somewhere";

function handler(req, res) {

    console.log('requesting', req.path);

    getDataAsync()
        .then(data => {
            let stream = renderContent(data);
            stream.pipe(res);
        })
        .catch(err => {
            res.end();
        })
}

我发现,它继续打印出console.log('requesting', req.path)(我认为会重新执行getDataAsync)。

我的问题是:

真的会重新执行getDataAsync吗? 如果是这样,你的方法是什么?

谢谢大家!

【问题讨论】:

    标签: node.js express stream


    【解决方案1】:

    Node JS 是非阻塞的,因此如果您再次使用此处理程序向端点发出请求,那么它将执行。处理程序将调用 getDataAsync(),然后将处理程序从调用堆栈中删除。对每个请求重复该过程。

    如果您希望处理程序在再次调用它之前等待流结束,您可以这样做:

    import getDataAsync from "./somewhere";
    
    let streamComplete = true;
    
    function handler(req, res) {
        
        if(!streamComplete) {
          res.end();
        }
      
        console.log('requesting', req.path);
    
        getDataAsync()
            .then(data => {
                streamComplete = false;
                let stream = renderContent(data);
                stream.pipe(res);
                stream.on('end', () => streamComplete = true);
            })
            .catch(err => {
                res.end();
            })
    }
    

    【讨论】:

      【解决方案2】:

      我确实需要在我的一个项目中解决这个问题。节点或实际上任何其他环境/语言都会有同样的问题,一旦您开始将数据流式传输到一个客户端,就很难将其流式传输到另一个客户端。这是因为一旦你这样做了:

      inputStream.pipe(outputStream);
      

      ...输入数据将被推送到输出并从内存中删除。因此,如果您只是再次通过管道输入输入流,您将丢失一些初始数据部分。

      我想出的解决方案是编写一个Transform 流,将数据保存在内存中,之后您可以重用它。这样的流将拥有所有原始块,同时当它赶上第一个请求时,它将继续直接推送块。我将解决方案打包为 npm 模块并发布,现在您可以使用它了。

      这就是你如何使用它:

      const {ReReadable} = require("rereadable-stream");
      // We'll use this for caching - you can use a Map if you have more streams
      let cachedStream;
      
      // This function will get the stream and 
      const getCachedStream = () => 
          (cachedStream || (cachedStream = 
              getDataAsync()
              .then(
                  data => renderContent(data).pipe(new ReReadable())
              ))
          )
          .then(readable => readable.rewind())
      

      这样的函数会调用一次getDataAsync,然后会将数据推送到可回退的流中,但每次执行该函数时,流都会以rewound 开头。

      你可以read a bit more about the rereadable-stream module here

      请注意 - 请记住,您现在会将所有这些数据保存在内存中,因此如果那里有更多块并控制您的内存使用情况,请小心清理它。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-12-20
        • 1970-01-01
        • 1970-01-01
        • 2021-03-19
        • 2014-05-27
        • 1970-01-01
        相关资源
        最近更新 更多