【问题标题】:Nodejs: merge join mulitple filesNodejs:合并加入多个文件
【发布时间】:2021-03-28 09:05:04
【问题描述】:

考虑这种情况:

我有 2 个 csv 文件,每个文件都经过排序并包含 id 归档。 我需要使用id 字段加入行。因为文件已经按id 排序,所以我想执行合并连接(https://en.wikipedia.org/wiki/Sort-merge_join)。

为此,我需要一种方法来加载两个文件的某些部分,对其进行处理并从一个或两个文件中迭代地再次加载更多内容。 (文件很大,不适合内存,所以只能使用流式处理)。

问题是Node API,用什么?由于https://github.com/nodejs/node/issues/33463readline 将无法工作。还有其他想法吗?

【问题讨论】:

    标签: javascript node.js stream


    【解决方案1】:

    我最近不得不做一些类似的事情,并决定使用node-line-reader 模块,它的界面比内置的readline 更简单。然后我创建了一个小递归函数,通过比较每个提供的文件的每个 csv 条目的id 来确定下一个从哪个文件读取。之后,相应的行被写出到目标文件,并再次调用该方法,直到处理完所有文件的所有行。这是我结束的整个课程:

    const fs = require('fs');
    const LineReader = require('node-line-reader').LineReader;
    
    class OrderedCsvFileMerger {
    
        constructor(files, targetFile) {
            this.lineBuffer = [];
            this.initReaders(files);
            this.initWriter(targetFile);
        }
    
        initReaders(files) {
            this.readers = files.map(file => new LineReader(file));
        }
    
        initWriter(targetFile) {
            this.writer = fs.createWriteStream(targetFile);
        }
    
        async mergeFiles() {
            // initially read first line from all files
            for (const reader of this.readers) {
                this.lineBuffer.push(await this.nextLine(reader));
            }
            return this.merge();
        }
    
        async nextLine(reader) {
            return new Promise((resolve, reject) => {
                reader.nextLine(function (err, line) {
                    if (err) reject(err);
                    resolve(line);
                });
            })
        }
    
        async merge() {
            if (this.allLinesProcessed()) {
                return;
            }
            let currentBufferIndex = -1;
            let minRowId = Number.MAX_VALUE;
            for (let i = 0; i < this.lineBuffer.length; i++) {
                const currentRowId = parseInt(this.lineBuffer[i]); // implement parsing logic if your lines do not start
                // with an integer id
                if (currentRowId < minRowId) {
                    minRowId = currentRowId;
                    currentBufferIndex = i;
                }
            }
            const line = this.lineBuffer[currentBufferIndex];
            this.writer.write(line + "\n");
    
            this.lineBuffer[currentBufferIndex] = await this.nextLine(this.readers[currentBufferIndex]);
            return this.merge();
        }
    
        allLinesProcessed() {
            return this.lineBuffer.every(l => !l);
        }
    }
    
    (async () => {
        const input = ['./path/to/csv1.csv', './path/to/csv2.csv'];
        const target = './path/to/target.csv';
        const merger = new OrderedCsvFileMerger(files, output);
        await merger.mergeFiles();
        console.log("Files were merged successfully!")
    })().catch(err => {
        console.log(err);
    });
    

    【讨论】:

    • 对此有何反馈?
    猜你喜欢
    • 2018-04-23
    • 1970-01-01
    • 2014-05-07
    • 1970-01-01
    • 1970-01-01
    • 2012-10-21
    • 2014-10-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多