【问题标题】:Processing nested streams处理嵌套流
【发布时间】:2019-04-05 00:23:08
【问题描述】:

我正在尝试通过加入 2 个 csv 输入流来生成输出文件,对于 csv 1 中的每条记录,我想为 csv 2 中的每条记录生成一个输出。

我在浏览任何类似解决方案的堆栈溢出时遇到了高地,并遇到了:

Nested stream operations in Highland.js

我已尝试根据自己的问题对此进行调整,目前为止:

    const debug = require('debug')('csvparse');
    const csv = require('fast-csv');
    const fs = require('fs');
    const args = process.argv;
    const h = require('highland');

    const typestream = h(fs.createReadStream(args[2]).pipe(csv({ headers: true, ignoreEmpty: true })));
    const postcodestream = h(fs.createReadStream(args[3]).pipe(csv({ headers: true, ignoreEmpty: true })));

    const pipeline = typestream.flatMap((type) => {
        debug(type);

        return postcodestream.flatMap((postcode) => {
            debug(postcode);

            return h([`${type.type}-${postcode.postcode}\n`]);
        });
    });

    pipeline.pipe(process.stdout);

使用以下示例输入 csv1:

type,
STREET,
ROAD,

csv2:

postcode,
3456
3446
1234

我期望输出

STREET-3456
STREET-3446
STREET-1234
ROAD-3456
ROAD-3446
ROAD-1234

但我刚刚得到:

STREET-3456
STREET-3446
STREET-1234

我可以从调试语句中看到我退出 ROAD 一次,然后它就停止了。

【问题讨论】:

    标签: node.js highland.js


    【解决方案1】:

    好的,我发现了我的问题,基本上我应该一直使用 through 进行 csv 解析而不是包装管道,而且我还应该在初始 flatMap 中创建 fs.createReadStream 而不是从变量中引用它(如流将在初始迭代后完成)。

    代码现在是:

    #!/usr/bin/node
    const debug = require('debug')('csvparse');
    const csv = require('fast-csv');
    const fs = require('fs');
    const args = process.argv;
    const h = require('highland');
    
    const pipeline = h(fs.createReadStream(args[2]))
        .through(csv({ headers: true, ignoreEmpty: true }))
        .flatMap((type) => {
            return h(fs.createReadStream(args[3]))
                .through(csv({ headers: true, ignoreEmpty: true }))
                .map((postcode) => {
                    return `${type.type}-${postcode.postcode}\n`;
                });
        });
    
    pipeline.pipe(process.stdout);
    

    【讨论】:

      猜你喜欢
      • 2015-09-13
      • 2019-03-10
      • 2020-05-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多