【问题标题】:Reading a file in real-time using Node.js使用 Node.js 实时读取文件
【发布时间】:2012-06-28 19:31:53
【问题描述】:

我需要找出使用 node.js 实时读取写入文件的数据的最佳方法。麻烦的是,Node 是一艘快速移动的船,这使得找到解决问题的最佳方法变得困难。

我想做的事
我有一个 java 进程正在做某事,然后将它所做的事情的结果写入文本文件。它通常需要 5 分钟到 5 小时的时间来运行,并且一直在写入数据,并且可以达到相当高的吞吐率(大约 1000 行/秒)。

我想实时读取此文件,然后使用节点聚合数据并将其写入套接字,以便在客户端上绘制图形。

客户端、图形、套接字和聚合逻辑都已完成,但我对读取文件的最佳方法感到困惑。

我已经尝试过(或至少玩过)
FIFO - 我可以告诉我的 Java 进程写入 fifo 并使用节点读取它,这实际上是如何我们目前使用 Perl 实现了这一点,但由于其他所有内容都在节点中运行,因此将代码移植过来是有意义的。

Unix Sockets - 同上。

fs.watchFile - 这能满足我们的需要吗?

fs.createReadStream - 这比 watchFile 好吗?

fs & tail -f - 看起来像个黑客。

实际上,我的问题是什么
我倾向于使用 Unix 套接字,这似乎是最快的选择。但是node是否有更好的内置功能来实时从fs读取文件?

【问题讨论】:

    标签: javascript node.js real-time fifo unix-socket


    【解决方案1】:

    我从@hasanyasin 那里得到了答案,并将其包装成一个模块化的承诺。基本思想是传递一个文件和一个处理函数,该函数对从文件中读取的字符串化缓冲区执行某些操作。如果处理函数返回 true,则文件将停止读取。您还可以设置一个超时,如果处理程序没有足够快地返回 true,则会终止读取。

    如果由于超时而调用resolve(),则promizer将返回true,否则将返回false。

    用法示例见底部。

    // https://stackoverflow.com/a/11233045
    
    var fs = require('fs');
    var Promise = require('promise');
    
    class liveReaderPromiseMe {
        constructor(file, buffStringHandler, opts) {
            /*
                var opts = {
                    starting_position: 0,
                    byte_size: 256,
                    check_for_bytes_every_ms: 3000,
                    no_handler_resolution_timeout_ms: null
                };
            */
    
            if (file == null) {
                throw new Error("file arg must be present");
            } else {
                this.file = file;
            }
    
            if (buffStringHandler == null) {
                throw new Error("buffStringHandler arg must be present");
            } else {
                this.buffStringHandler = buffStringHandler;
            }
    
            if (opts == null) {
                opts = {};
            }
    
            if (opts.starting_position == null) {
                this.current_position = 0;
            } else {
                this.current_position = opts.starting_position;
            }
    
            if (opts.byte_size == null) {
                this.byte_size = 256;
            } else {
                this.byte_size = opts.byte_size;
            }
    
            if (opts.check_for_bytes_every_ms == null) {
                this.check_for_bytes_every_ms = 3000;
            } else {
                this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms;
            }
    
            if (opts.no_handler_resolution_timeout_ms == null) {
                this.no_handler_resolution_timeout_ms = null;
            } else {
                this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms;
            }
        }
    
    
        startHandlerTimeout() {
            if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) {
                var that = this;
                this._handlerTimer = setTimeout(
                    function() {
                        that._is_handler_timed_out = true;
                    },
                    this.no_handler_resolution_timeout_ms
                );
            }
        }
    
        clearHandlerTimeout() {
            if (this._handlerTimer != null) {
                clearTimeout(this._handlerTimer);
                this._handlerTimer = null;
            }
            this._is_handler_timed_out = false;
        }
    
        isHandlerTimedOut() {
            return !!this._is_handler_timed_out;
        }
    
    
        fsReadCallback(err, bytecount, buff) {
            try {
                if (err) {
                    throw err;
                } else {
                    this.current_position += bytecount;
                    var buff_str = buff.toString('utf-8', 0, bytecount);
    
                    var that = this;
    
                    Promise.resolve().then(function() {
                        return that.buffStringHandler(buff_str);
                    }).then(function(is_handler_resolved) {
                        if (is_handler_resolved) {
                            that.resolve(false);
                        } else {
                            process.nextTick(that.doReading.bind(that));
                        }
                    }).catch(function(err) {
                        that.reject(err);
                    });
                }
            } catch(err) {
                this.reject(err);
            }
        }
    
        fsRead(bytecount) {
            fs.read(
                this.file,
                new Buffer(bytecount),
                0,
                bytecount,
                this.current_position,
                this.fsReadCallback.bind(this)
            );
        }
    
        doReading() {
            if (this.isHandlerTimedOut()) {
                return this.resolve(true);
            } 
    
            var max_next_bytes = fs.fstatSync(this.file).size - this.current_position;
            if (max_next_bytes) {
                this.fsRead( (this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size );
            } else {
                setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms);
            }
        }
    
    
        promiser() {
            var that = this;
            return new Promise(function(resolve, reject) {
                that.resolve = resolve;
                that.reject = reject;
                that.doReading();
                that.startHandlerTimeout();
            }).then(function(was_resolved_by_timeout) {
                that.clearHandlerTimeout();
                return was_resolved_by_timeout;
            });
        }
    }
    
    
    module.exports = function(file, buffStringHandler, opts) {
        try {
            var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts);
            return live_reader.promiser();
        } catch(err) {
            return Promise.reject(err);
        }
    };
    

    然后像这样使用上面的代码:

    var fs = require('fs');
    var path = require('path');
    var Promise = require('promise');
    var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser');
    
    var ending_str = '_THIS_IS_THE_END_';
    var test_path = path.join('E:/tmp/test.txt');
    
    var s_list = [];
    var buffStringHandler = function(s) {
        s_list.push(s);
        var tmp = s_list.join('');
        if (-1 !== tmp.indexOf(ending_str)) {
            // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms
            // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true
            return true;
            // you can also return a promise:
            //  return Promise.resolve().then(function() { return true; } );
        }
    };
    
    var appender = fs.openSync(test_path, 'a');
    try {
        var reader = fs.openSync(test_path, 'r');
        try {
            var options = {
                starting_position: 0,
                byte_size: 256,
                check_for_bytes_every_ms: 3000,
                no_handler_resolution_timeout_ms: 10000,
            };
    
            liveReadAppendingFilePromiser(reader, buffStringHandler, options)
            .then(function(did_reader_time_out) {
                console.log('reader timed out: ', did_reader_time_out);
                console.log(s_list.join(''));
            }).catch(function(err) {
                console.error('bad stuff: ', err);
            }).then(function() {
                fs.closeSync(appender);
                fs.closeSync(reader);
            });
    
            fs.write(appender, '\ncheck it out, I am a string');
            fs.write(appender, '\nwho killed kenny');
            //fs.write(appender, ending_str);
        } catch(err) {
            fs.closeSync(reader);
            console.log('err1');
            throw err;
        }
    } catch(err) {
        fs.closeSync(appender);
            console.log('err2');
        throw err;
    }
    

    【讨论】:

      【解决方案2】:

      如果您想将文件作为数据的持久存储以防止在系统崩溃或正在运行的进程网络中的一个成员死亡的情况下丢失流,您仍然可以继续写入文件并从中读取。

      如果您不需要此文件作为 Java 进程生成的结果的持久存储,那么使用 Unix 套接字在易用性和性能方面都会好得多。

      fs.watchFile() 不是您所需要的,因为它适用于文件系统报告的文件统计信息,并且由于您想读取已经写入的文件,所以这不是您想要的。

      简短更新:我很遗憾地意识到,虽然我在上一段中指责 fs.watchFile() 使用文件统计信息,但我自己在下面的示例代码中做了同样的事情!虽然我已经警告读者“小心!”因为我只用了几分钟就写好了,甚至没有测试好;不过,如果底层系统支持,使用fs.watch() 代替watchFilefstatSync 可以做得更好。

      为了从文件中读取/写入,我刚刚在下面写了一些有趣的东西:

      test-fs-writer.js:[您不需要这个,因为您在 Java 进程中编写文件]

      var fs = require('fs'),
          lineno=0;
      
      var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'});
      
      stream.on('open', function() {
          console.log('Stream opened, will start writing in 2 secs');
          setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000);
      });
      

      test-fs-reader.js:[注意,这只是演示,检查错误对象!]

      var fs = require('fs'),
          bite_size = 256,
          readbytes = 0,
          file;
      
      fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); });
      
      function readsome() {
          var stats = fs.fstatSync(file); // yes sometimes async does not make sense!
          if(stats.size<readbytes+1) {
              console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!');
              setTimeout(readsome, 3000);
          }
          else {
              fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome);
          }
      }
      
      function processsome(err, bytecount, buff) {
          console.log('Read', bytecount, 'and will process it now.');
      
          // Here we will process our incoming data:
              // Do whatever you need. Just be careful about not using beyond the bytecount in buff.
              console.log(buff.toString('utf-8', 0, bytecount));
      
          // So we continue reading from where we left:
          readbytes+=bytecount;
          process.nextTick(readsome);
      }
      

      您可以安全地避免使用nextTick,而是直接调用readsome()。由于我们仍在此处进行同步,因此在任何意义上都没有必要。我只是喜欢它。 :p

      编辑Oliver Lloyd

      以上面的例子为例,但将其扩展为读取 CSV 数据给出:

      var lastLineFeed,
          lineArray;
      function processsome(err, bytecount, buff) {
          lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n');
      
          if(lastLineFeed > -1){
      
              // Split the buffer by line
              lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n');
      
              // Then split each line by comma
              for(i=0;i<lineArray.length;i++){
                  // Add read rows to an array for use elsewhere
                  valueArray.push(lineArray[i].split(','));
              }   
      
              // Set a new position to read from
              readbytes+=lastLineFeed+1;
          } else {
              // No complete lines were read
              readbytes+=bytecount;
          }
          process.nextTick(readFile);
      }
      

      【讨论】:

      • 这是一个很好的例子,它直接解决了我的问题。虽然一次只处理一条线,但需要增强,但可以说这是一件好事; node 缺少现有的 fs 接口意味着它是完全可定制的,所以即使我必须编写额外的代码,我也可以实现我所需要的。
      • 这在作为节点运行时绝对有效 但我怎样才能将此代码放入 app.js 并在 html 页面中获取结果?
      【解决方案3】:

      这个模块是@hasanyasin 建议的原则的实现:

      https://github.com/felixge/node-growing-file

      【讨论】:

      • 谢谢,这看起来在这里会很好用,而且 felixge 的其他项目也很可靠,所以我很高兴尝试这个模块。
      【解决方案4】:

      为什么你认为tail -f 是个黑客?

      虽然我发现了一个很好的例子,但我会做类似的事情。 使用 node.js 和 WebSocket 的实时在线活动监控示例:
      http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

      为了让这个答案完整,我给你写了一个示例代码,它可以在 0.8.0 下运行 - (http 服务器可能是一个 hack)。

      一个子进程通过tail运行产生,由于一个子进程是一个带有三个流的EventEmitter(我们在我们的例子中使用stdout),你可以添加一个带有on的监听器

      文件名:tailServer.js

      用法:node tailServer /var/log/filename.log

      var http = require("http");
      var filename = process.argv[2];
      
      
      if (!filename)
          return console.log("Usage: node tailServer filename");
      
      var spawn = require('child_process').spawn;
      var tail = spawn('tail', ['-f', filename]);
      
      http.createServer(function (request, response) {
          console.log('request starting...');
      
          response.writeHead(200, {'Content-Type': 'text/plain' });
      
          tail.stdout.on('data', function (data) {
            response.write('' + data);                
          });
      }).listen(8088);
      
      console.log('Server running at http://127.0.0.1:8088/');
      

      【讨论】:

      • 我对 tail -f 的担忧是它要求在写入文件之前读取进程处于活动状态,如果不是数据丢失的话。我的用例是读取可能在数据写入后很长时间发生。 +1 更新到 0.8,虽然这是一个很好的解决方案,可以从同一源控制写入和读取。
      • watchFile 也是事件驱动的,但根据文档并不稳定。上面的示例通过轮询高级代码来处理文件更改。对我来说,这看起来像一个黑客。但是,只要它对您有用,这样做就很好。否则你可以touch该文件如果它不存在并且你不会丢失任何数据并且你可以用wc -l message.text | awk '{print $1}'来计算文件行并将它交给tail -f -n
      • 我不认为,这段代码可以在windows机器上运行
      • 我没有在 Windows 上测试过。虽然 tail 应该在 Win10 上工作,否则 WSL。但现在有更好的解决方案。也没有可用且维护良好的依赖尾 npm。
      猜你喜欢
      • 2021-01-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-28
      • 2016-04-15
      • 2019-05-30
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多