【问题标题】:How to read lines in batches from a text file in Node.js如何从 Node.js 中的文本文件中批量读取行
【发布时间】:2021-07-21 06:26:08
【问题描述】:

我正在尝试找出一种方法来打开 Node.js 流以将文本/CSV 文件作为流读取,以便:

  • 读取一批行
  • 暂停阅读
  • 进行一些异步处理:将所有行解析为对象数组,然后将它们发送到 API。
  • 继续阅读
  • 转到步骤 1

我一直在使用readline 并取得了一些成功,但由于pause() doesn't stop it right away 它并不能很好地工作,并且在前一批仍在处理时它会继续读取更多行。最终结果是并发批处理上的下游 API 调用阻塞,我被限制了。

我想在处理批处理时“关闭水龙头”,所以如果我受到限制,我可以回退,回退,然后重试直到批处理完全处理,然后我可以继续阅读。

有一个基于split 的旧包event-stream 处理行拆分,但两个项目都已存档。

这个问题可以用Transform Stream解决吗

【问题讨论】:

    标签: node.js node-streams


    【解决方案1】:

    csv 示例

    npm i csv-parser --save

    const csv = require('csv-parser')
    const fs = require('fs')
    
    let counter = 0;
    let batch = []
    
    const stream = fs.createReadStream('FILE_PATH')
    .pipe(csv())
    .on('data', (data) => {
        console.log('data', data)
    
        batch.push(data)
        counter ++
    
        if(counter > 5000) {
            stream.pause()
    
            setTimeout(() => {
                // YOUR ASYNC PROCESSING
                counter = 0;
                batch = []
                stream.resume()
            }, 5000)
        }
    })
    .on('error', (e) => {
        console.error(e)
    })
    .on('end', () => {
        console.log('end');
    
       // YOUR ASYNC PROCESSING
    });
    

    txt 示例

    npm i line-by-line --save

    const LineByLineReader = require('line-by-line')
    
    let counter = 0;
    let batch = []
    
    rl = new LineByLineReader('FILE_PATH');
    
    rl.on('line', (line) => {
        console.log('line', line)
    
        batch.push(line)
        counter ++
    
        if(counter > 5000) {
            rl.pause()
    
            setTimeout(() => {
                // YOUR ASYNC PROCESSING
                counter = 0;
                batch = []
                rl.resume()
            }, 5000)
        }
    })
    .on('error', (e) => {
        console.error(e)
    })
    .on('end', () => {
        console.log()
    
        // YOUR ASYNC PROCESSING
    })
    

    【讨论】:

      【解决方案2】:

      之前的解决方案实际上导致了一些重复的处理。我发现解决这个问题的最快方法如下,仍然使用 csv-parse:

      import parse from 'csv-parse';
      import fs from 'fs';
      
      const stream = fs.createReadStream('FILE_PATH').pipe(parse());
      
      let recordsInBatch = [];
      let recordsCounter = 0;
      const BATCH_SIZE = 1000;
      
      for await (const record of stream) {
        recordsInBatch.push(record);
        recordsCounter += 1;
        if (recordsCounter >= BATCH_SIZE) {
          stream.pause();
          await yourAsyncCall(recordsInBatch);
          recordsCounter = 0;
          recordsInBatch = [];
          stream.resume();
        }
      }
      
      await yourAsyncCall(recordsInBatch);
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2017-06-26
        • 1970-01-01
        • 2015-10-05
        • 1970-01-01
        • 1970-01-01
        • 2017-10-25
        • 1970-01-01
        相关资源
        最近更新 更多