【问题标题】:Fast-csv read several files synchronouslyfast-csv 同步读取多个文件
【发布时间】:2018-07-26 12:59:34
【问题描述】:

我正在尝试使用 fast-csv 同步读取多个文件,它应该如下所示:

read file 1
execute something while reading
read file 2
execute something while reading (it must be execute after first execution's file that's why I need to do this synchronously)
...

这是我的代码简化:

const csv = require('fast-csv');
const PROCEDURES = [
    { "name": "p1", "file": "p1.csv" },
    { "name": "p2", "file": "p2.csv" },
];

const launchProcedure = (name, file) => {

    try {   
        const fs = require("fs");
        const stream = fs.createReadStream(file, {
            encoding: 'utf8'
        });
        console.log('launching parsing...');

        stream.once('readable', () => {
        // ignore first line
        let chunk;
        while (null !== (chunk = stream.read(1))) {
            if (chunk == '\n') {
                break;
            }
        }

            // CSV parsing
            const csvStream = csv.fromStream(stream, {
                renameHeaders: false,
                headers: true,
                delimiter: ',',
                rowDelimiter: '\n',
                quoteHeaders: false,
                quoteColumns: false
            }).on("data", data => {
                console.log('procedure execution...');
                // I execute a procedure...

            }).on("error", error => {
                logger.error(error);
            }).on("end", data => {
                logger.info(data);
            });
         });

    }
    catch (e) {
        logger.info(e);
    }
}

PROCEDURES.forEach(procedure => {
    launchProcedure(procedure.name, procedure.file);
});

输出将是:

launching parsing...
launching parsing...
procedure execution...
procedure execution...

问题出现在 stream.once 上,但我用它来忽略第一行。我试图承诺我的功能并使用异步等待...... (我在执行我的过程时遇到了类似的问题,我通过使用 csvStream.pause() 和 csvStream.resume() 解决了它)。

有什么想法吗?

【问题讨论】:

    标签: javascript node.js csv parsing asynchronous


    【解决方案1】:

    嘿,在获取您的答案之前,我在此期间找到了解决方案(抱歉稍后发布)!

    const launchProcedure = (name, file, callback) => {
    
        try {   
            const fs = require("fs");
            const stream = fs.createReadStream(file, {
                encoding: 'utf8'
            });
            console.log('launching parsing...');
    
            stream.once('readable', () => {
            // ignore first line
            let chunk;
            while (null !== (chunk = stream.read(1))) {
                if (chunk == '\n') {
                    break;
                }
            }
    
                // CSV parsing
                const csvStream = csv.fromStream(stream, {
                    renameHeaders: false,
                    headers: true,
                    delimiter: ',',
                    rowDelimiter: '\n',
                    quoteHeaders: false,
                    quoteColumns: false
                }).on("data", data => {
                    console.log('procedure execution...');
                    // I execute a procedure...
    
                }).on("error", error => {
                    logger.error(error);
                }).on("end", data => {
                    logger.info(data);
                    callback();
                });
             });
    
        }
        catch (e) {
            logger.info(e);
        }
    }
    async.eachSeries(PROCEDURES, (procedure, callback) => {
            launchProcedure(db, procedure.name, procedure.file, callback);        
        }, error => {
            if (error) {
                logger.error(error);
            }
            else {
                logger.info("done");
            }
        });
    

    【讨论】:

      【解决方案2】:

      使用 Promise 似乎是解决这个问题的好方法。但请注意,当您使用 new Promise(executor) 创建新的 Promise 时,executorexecuted immediately。所以你需要延迟它,直到之前的承诺被执行。

      要“承诺”launchProcedure 函数,您需要在函数的开头返回一个新的 Promise:

      const launchProcedure = (name, file) => {
          return new Promise((resolve, reject) => {
      

      然后您需要在解析完成时调用resolve(表示成功)和reject(表示失败)。

      最后我们需要把promise串起来:

      let promise = launchProcedure(PROCEDURES[0].name, PROCEDURES[0].file);
      for (let i = 1; i < PROCEDURES.length; i++) {
          promise = promise.then(() => launchProcedure(PROCEDURES[i].name, PROCEDURES[i].file));
      }
      

      请注意,我在then 中使用了一个 lambda 函数来延迟 Promise 的创建。 (顺便还有nicer ways串起promise。)

      最终代码如下所示:

      const csv = require('fast-csv');
      const PROCEDURES = [
          { "name": "p1", "file": "p1.csv" },
          { "name": "p2", "file": "p2.csv" },
      ];
      
      const launchProcedure = (name, file) => {
          return new Promise((resolve, reject) => {
              try {   
                  const fs = require("fs");
                  const stream = fs.createReadStream(file, {
                      encoding: 'utf8'
                  });
                  console.log('launching parsing...');
      
                  stream.once('readable', () => {
                      // ignore first line
                      let chunk;
                      while (null !== (chunk = stream.read(1))) {
                          if (chunk == '\n') {
                              break;
                          }
                      }
      
                      // CSV parsing
                      const csvStream = csv.fromStream(stream, {
                          renameHeaders: false,
                          headers: true,
                          delimiter: ',',
                          rowDelimiter: '\n',
                          quoteHeaders: false,
                          quoteColumns: false
                      }).on("data", data => {
                          console.log('procedure execution...');
                          // I execute a procedure...
      
                      }).on("error", error => {
                          logger.error(error);
                          reject(error);
                      }).on("end", data => {
                          logger.info(data);
                          resolve();
                      });
                  });
              }
              catch (e) {
                  logger.info(e);
                  reject(e);
              }
          });
      }
      
      let promise = launchProcedure(PROCEDURES[0].name, PROCEDURES[0].file);
      for (let i = 1; i < PROCEDURES.length; i++) {
          promise = promise.then(() => launchProcedure(PROCEDURES[i].name, PROCEDURES[i].file));
      }
      promise.then(() => { console.log('all files parsed'); });
      

      【讨论】:

        【解决方案3】:

        这里的问题是launchProcedure 必须是async 才能使用await。另一个问题是async/awaitArray.forEach 一起使用并不是最好的选择(see here)。

        您可以使用“for-of”循环并在循环体中等待:

        const csv = require('fast-csv');
        const fs = require('fs');
        const PROCEDURES = [
            { "name": "p1", "file": "p1.csv" },
            { "name": "p2", "file": "p2.csv" },
        ];
        
        const launchProcedure = async (name, file) => {
        
            try {   
                // only require this once in the file (you require each time `launchProcedure` is getting called)
                // const fs = require("fs"); 
                const stream = fs.createReadStream(file, {
                    encoding: 'utf8'
                });
        
                console.log('launching parsing...');
        
                // wait for the readable (or error) event
                const ready = await new Promise((resolve, reject) => {
                    stream.on('readable', resolve);
                    stream.on('error', reject);
                })
                .then(() => true)
                .catch(() => false);
        
                console.log('file is ready: ', ready)
        
                if (!ready) {
                    throw new Error(`Unable to read file (file-name: "${file}")`);
                }
        
                // ignore first line
                let chunk;
                while (null !== (chunk = stream.read(1))) {
                    if (chunk == '\n') {
                        break;
                    }
                }
        
                // CSV parsing
                const csvStream = csv.fromStream(stream, {
                    renameHeaders: false,
                    headers: true,
                    delimiter: ',',
                    rowDelimiter: '\n',
                    quoteHeaders: false,
                    quoteColumns: false
                }).on("data", data => {
                    console.log('procedure execution...');
                    // I execute a procedure...
        
                }).on("error", error => {
                    logger.error(error);
                }).on("end", data => {
                    logger.info(data);
                });
        
                console.log(`Done reading file (file-name: "${file}")`);
            }
            catch (e) {
                logger.info(e);
            }
        }
        
        
        // Wrap your iteration over the `PROCEDURES` array into an async function (this makes `await` available inside the function)
        // Then use "for-of" here instead of for each to have full async support.
        const runProcedures = async (procedures) => {
            for (procedure of PROCEDURES) {
                await launchProcedure(procedure.name, procedure.file);
            }
        }
        
        runProcedures(PROCEDURES);
        

        输出:

        launching parsing...
        file is ready:  true
        Done reading file (file-name: "p1.csv")
        launching parsing...
        file is ready:  true
        Done reading file (file-name: "p2.csv")
        

        【讨论】:

        • “文件准备就绪:true”之后文件的每一行难道没有“程序执行...”吗?
        • @Curse 不,我不知道。这可能是因为我的 csv 文件不适合您的 CSV 选项。我没有检查,tbh。
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2019-01-07
        • 1970-01-01
        • 1970-01-01
        • 2017-08-20
        • 1970-01-01
        • 2019-01-13
        相关资源
        最近更新 更多