【问题标题】:Error handling with node.js streamsnode.js 流的错误处理
【发布时间】:2014-03-13 08:00:30
【问题描述】:

处理流错误的正确方法是什么?我已经知道您可以收听一个“错误”事件,但我想了解有关任意复杂情况的更多详细信息。

对于初学者,当你想做一个简单的管道链时你会怎么做:

input.pipe(transformA).pipe(transformB).pipe(transformC)...

您如何正确地创建其中一种转换以便正确处理错误?

更多相关问题:

  • 当错误发生时,'end' 事件会发生什么?它永远不会被解雇吗?它有时会被解雇吗?它是否取决于转换/流?这里的标准是什么?
  • 是否有通过管道传播错误的机制?
  • 域是否有效地解决了这个问题?例子会很好。
  • “错误”事件产生的错误是否有堆栈跟踪?有时?绝不?有没有办法从他们那里得到一个?

【问题讨论】:

  • 这不是小事。 Promise 框架让它变得更简单
  • 不幸的是,promise/futures 并不能真正帮助您处理流...

标签: node.js stream


【解决方案1】:
const http = require('http');
const fs = require('fs');
const server = http.createServer();

server.on('request',(req,res)=>{
    const readableStream = fs.createReadStream(__dirname+'/README.md');
    const writeableStream = fs.createWriteStream(__dirname+'/assets/test.txt');
    readableStream
    .on('error',()=>{
        res.end("File not found")
    })
    .pipe(writeableStream)
    .on('error',(error)=>{
        console.log(error)
        res.end("Something went to wrong!")
    })
    .on('finish',()=>{
        res.end("Done!")
    })
})

server.listen(8000,()=>{
    console.log("Server is running in 8000 port")
})

【讨论】:

  • 我很确定这不能正确捕获管道到 writeableStream 时发生的错误。
  • @BT ,我已经根据您的意见编辑了我的帖子,我相信现在我们可以在管道传输到 writeableStream 期间遇到一些错误时处理该错误。
【解决方案2】:

使用multipipe 包将多个流组合成一个双工流。并在一处处理错误。

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC) 


// centralized error handling
stream.on('error', fn)

【讨论】:

    【解决方案3】:

    变换

    转换流既可读又可写,因此是非常好的“中间”流。因此,它们有时被称为through 流。它们在这种方式上类似于双工流,只是它们提供了一个很好的接口来操作数据,而不仅仅是发送数据。转换流的目的是在数据通过流传输时对其进行操作。例如,您可能想要进行一些异步调用,或者派生几个字段,重新映射一些东西等。



    有关如何创建转换流,请参阅 herehere。你所要做的就是:

    1. 包含流模块
    2. 实例化(或继承自)Transform 类
    3. 实现一个_transform 方法,该方法采用(chunk, encoding, callback)

    块是你的数据。如果您在objectMode = true 工作,大多数时候您无需担心编码问题。当您完成处理块时调用回调。然后将该块推送到下一个流。

    如果你想要一个很好的帮助模块,让你能够非常轻松地通过流进行操作,我建议through2

    关于错误处理,请继续阅读。

    管道

    在管道链中,处理错误确实很重要。根据this thread .pipe() 不是为了转发错误而构建的。所以像......

    var a = createStream();
    a.pipe(b).pipe(c).on('error', function(e){handleError(e)});
    

    ... 只会侦听流 c 上的错误。如果在a 上发出错误事件,则不会传递下去,实际上会抛出。要正确执行此操作:

    var a = createStream();
    a.on('error', function(e){handleError(e)})
    .pipe(b)
    .on('error', function(e){handleError(e)})
    .pipe(c)
    .on('error', function(e){handleError(e)});
    

    现在,虽然第二种方法更冗长,但您至少可以保留错误发生位置的上下文。这通常是一件好事。

    我觉得有一个库很有帮助,但如果您只想在目的地捕获错误并且您不太关心它发生在哪里,那就是event-stream

    结束

    当触发错误事件时,不会触发结束事件(显式)。发出错误事件将结束流。

    根据我的经验,域名大部分时间都运行良好。如果您有未处理的错误事件(即在没有侦听器的情况下在流上发出错误),服务器可能会崩溃。现在,正如上面的文章所指出的,您可以将流包装在应该正确捕获所有错误的域中。

    var d = domain.create();
     d.on('error', handleAllErrors);
     d.run(function() {
         fs.createReadStream(tarball)
           .pipe(gzip.Gunzip())
           .pipe(tar.Extract({ path: targetPath }))
           .on('close', cb);
     });
    

    域的美妙之处在于它们将保留堆栈跟踪。尽管事件流在这方面也做得很好。

    如需进一步阅读,请查看stream-handbook。相当深入,但超级有用,并提供了许多有用模块的链接。

    【讨论】:

    • 这是非常棒的信息,谢谢!您能否补充一下您为什么要创建转换流以及它与我的问题相关的原因?
    • 当然 - 虽然我认为这与你问的有关; )
    • isaccs 在 Google Groups-nodejs 上发帖:groups.google.com/d/msg/nodejs/lJYT9hZxFu0/L59CFbqWGyYJ(不是 grokbase)
    • 这个答案写得很完美。我将调查域建议——它似乎是我正在寻找的那种解决方案。
    • 请注意,您不需要将.on('error') 处理程序包装在匿名函数中,即a.on('error', function(e){handleError(e)}) 可以只是a.on('error', handleError)
    【解决方案4】:

    尝试 catch 不会捕获流中发生的错误,因为它们是在调用代码退出后抛出的。你可以参考文档:

    https://nodejs.org/dist/latest-v10.x/docs/api/errors.html

    【讨论】:

    • 谢谢,但这根本不能回答问题。
    • 给我一份 40 页的文件没有帮助。你认为我应该在那个巨大的页面中提到什么?另外,你读过我的问题吗?我的问题不是“尝试与流一起工作吗?”我已经很清楚 try-catch 不适用于异步错误,例如来自流处理管道的错误。
    【解决方案5】:

    .on("error", handler) 仅处理 Stream 错误,但如果您使用自定义 Transform 流,.on("error", handler) 不会捕获 _transform 函数内部发生的错误。所以人们可以做这样的事情来控制应用程序流:-

    _transform 函数中的this 关键字指的是Stream 本身,它是一个EventEmitter。所以你可以像下面这样使用try catch 来捕获错误,然后将它们传递给自定义事件处理程序。

    // CustomTransform.js
    CustomTransformStream.prototype._transform = function (data, enc, done) {
      var stream = this
      try {
        // Do your transform code
      } catch (e) {
        // Now based on the error type, with an if or switch statement
        stream.emit("CTError1", e)
        stream.emit("CTError2", e)
      }
      done()
    }
    
    // StreamImplementation.js
    someReadStream
      .pipe(CustomTransformStream)
      .on("CTError1", function (e) { console.log(e) })
      .on("CTError2", function (e) { /*Lets do something else*/ })
      .pipe(someWriteStream)
    

    这样,您可以将逻辑和错误处理程序分开。此外,您可以选择只处理一些错误而忽略其他错误。

    更新
    替代方案:RXJS Observable

    【讨论】:

      【解决方案6】:

      如果您使用的是节点 >= v10.0.0,则可以使用 stream.pipelinestream.finished

      例如:

      const { pipeline, finished } = require('stream');
      
      pipeline(
        input, 
        transformA, 
        transformB, 
        transformC, 
        (err) => {
          if (err) {
            console.error('Pipeline failed', err);
          } else {
            console.log('Pipeline succeeded');
          }
      });
      
      
      finished(input, (err) => {
        if (err) {
          console.error('Stream failed', err);
        } else {
          console.log('Stream is done reading');
        }
      });
      

      更多讨论请参见github PR

      【讨论】:

      • 既然pipeline 已经有回调,为什么还要使用finished
      • 您可能希望在管道和各个流之间以不同方式处理错误。
      • pipeline 可以用于双工流吗?如果它用于双工流,我们是否需要创建 2 个管道?我有 2 个 TCP 套接字需要相互连接,创建 2 个管道有意义吗?
      【解决方案7】:

      通过创建转换流机制并使用参数调用其回调done 以传播错误来使用 Node.js 模式:

      var transformStream1 = new stream.Transform(/*{objectMode: true}*/);
      
      transformStream1.prototype._transform = function (chunk, encoding, done) {
        //var stream = this;
      
        try {
          // Do your transform code
          /* ... */
        } catch (error) {
          // nodejs style for propagating an error
          return done(error);
        }
      
        // Here, everything went well
        done();
      }
      
      // Let's use the transform stream, assuming `someReadStream`
      // and `someWriteStream` have been defined before
      someReadStream
        .pipe(transformStream1)
        .on('error', function (error) {
          console.error('Error in transformStream1:');
          console.error(error);
          process.exit(-1);
         })
        .pipe(someWriteStream)
        .on('close', function () {
          console.log('OK.');
          process.exit();
        })
        .on('error', function (error) {
          console.error(error);
          process.exit(-1);
         });
      

      【讨论】:

      • 嗯,你是说如果所有的流处理器都是这样构建的,错误就会传播?
      • 谢谢。对于流转换器,这似乎是正确的方法。错误通过 stram.pipeline 传播,可以很容易地包装在 Promise 中。
      【解决方案8】:

      可以使用一个简单的函数将整个链中的错误传播到最右边的流:

      function safePipe (readable, transforms) {
          while (transforms.length > 0) {
              var new_readable = transforms.shift();
              readable.on("error", function(e) { new_readable.emit("error", e); });
              readable.pipe(new_readable);
              readable = new_readable;
          }
          return readable;
      }
      

      可以这样使用:

      safePipe(readable, [ transform1, transform2, ... ]);
      

      【讨论】:

        【解决方案9】:

        域已弃用。你不需要它们。

        对于这个问题,transform 或 writable 之间的区别并不那么重要。

        mshell_lauren 的回答很好,但作为替代方案,您也可以在您认为可能出错的每个流上显式侦听错误事件。如果您愿意,可以重用处理函数。

        var a = createReadableStream()
        var b = anotherTypeOfStream()
        var c = createWriteStream()
        
        a.on('error', handler)
        b.on('error', handler)
        c.on('error', handler)
        
        a.pipe(b).pipe(c)
        
        function handler (err) { console.log(err) }
        

        如果其中一个流触发其错误事件,这样做可以防止臭名昭著的未捕获异常

        【讨论】:

        • 哈哈,处理 3 个不同的错误事件很有趣,并祈祷编写 3 个不同的流媒体库的人正确地实现了错误处理
        • @Alex Mills 1) 处理 3 个事件的问题是什么,为什么它们“不同”,当它们的类型相同时 -- error,还不如解决每个事件都是不同的事实; 2) 除了原生 Node.js 功能之外,上面还写了哪些流媒体库? 3) 为什么他们如何在内部处理事件很重要,因为这显然允许任何人在已经存在的东西之上附加额外的错误处理程序?
        猜你喜欢
        • 1970-01-01
        • 2013-12-02
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2017-03-02
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多