【问题标题】:Zipping two readable streams does not seem to produce anything压缩两个可读流似乎不会产生任何东西
【发布时间】:2019-08-06 13:13:13
【问题描述】:

我有以下功能:

const _ = require('highland');

module.exports =
    (numbers /* Readable */, words /* Readable */, repeated_words /* Writable */) => {
        const numberStream = _(numbers);
        const wordStream = _(words);
        numberStream
            .zip(wordStream)
            .flatMap((numberWordPair) => {
                const result = [];
                for (let i = 0; i < numberWordPair[0]; i++) {
                    result.push(numberWordPair[1]);
                }
                return _(result);
            })
            .pipe(repeated_words);
    };

流参数是自动注入的,我 100% 确定流注入有效(其他流函数有效)。

当我用像_(numbers).each(xs =&gt; {console.log(xs)}) 这样简单的东西替换这个有点复杂的转换时,我可以看到正在记录的数据。

但是,在这里,Highland.js 肯定缺少一些东西,因为根本没有产生任何东西。

我使用的是 Highland.js 的 2.13.5 版本。

我错过了什么?

【问题讨论】:

    标签: node.js node-streams highland.js


    【解决方案1】:

    看起来一切正常,因此错误一定出在其他地方。 作为证明,我能够运行这个小程序:

    const {TextDecoder} = require('util');
    const repeater = require('./lib/repeater');
    const {PassThrough, Readable, Transform} = require('stream');
    
    class ConstantSource extends Readable {
    
        constructor(options, constant) {
            super(options);
            this.constant = constant;
        }
    
        _read(size) {
            for (let i = 0; i < size; i++) {
                this.push(this.constant);
            }
        }
    }
    
    class StdinSource extends Readable {
    
        constructor(options) {
            super(options);
            this.decoder = new TextDecoder('utf8');
            process.stdin.on('data', (chunk) => {
                this.push(this.decoder.decode(chunk).trim());
            });
        }
    
        _read(size) {
    
        }
    }
    
    class StdoutSink extends Transform {
    
        constructor(options) {
            super(options);
            this.pipe(process.stdout);
        }
    
        _transform(chunk, _, callback) {
            callback(null, '\x1b[33m' + chunk + '\x1b[0m\n');
        }
    }
    
    const main = () => {
        if (process.argv.length < 3) {
            console.error('Please specify a number.');
            process.exit(1);
        }
    
        const options = {objectMode: true};
        const inputStream1 = new ConstantSource(options, process.argv[2]);
        const inputStream2 = new StdinSource(options);
        const outputStream = new StdoutSink(options);
    
        repeater(inputStream1, inputStream2, outputStream);
    }
    
    main();
    

    【讨论】:

      猜你喜欢
      • 2019-01-25
      • 1970-01-01
      • 1970-01-01
      • 2021-06-24
      • 1970-01-01
      • 2021-03-25
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多