【问题标题】:Creating a Node.js stream from two piped streams从两个管道流创建 Node.js 流
【发布时间】:2013-07-02 12:57:49
【问题描述】:

如果可能的话,我想通过管道将两个 Node.js 流合并为一个。我正在使用Transform 流。

换句话说,我希望我的库返回myStream 供人们使用。例如,他们可以写:

process.stdin.pipe(myStream).pipe(process.stdout);

在内部,我正在使用第三方vendorStream,它可以做一些工作,插入到myInternalStream 中包含的我自己的逻辑中。所以上面的内容将转化为:

process.stdin.pipe(vendorStream).pipe(myInternalStream).pipe(process.stdout);

我可以这样做吗?我试过var myStream = vendorStream.pipe(myInternalStream),但这显然不起作用。

为了与bash 进行类比,假设我想编写一个程序来检查字母h 是否出现在某个流的最后一行(tail -n 1 | grep h)中,我可以创建一个shell 脚本:

# myscript.sh
tail -n 1 | grep h

然后如果人们这样做:

$ printf "abc\ndef\nghi" | . myscript.sh

它只是工作。

这是我目前所拥有的:

// Combine a pipe of two streams into one stream

var util = require('util')
  , Transform = require('stream').Transform;

var chunks1 = [];
var stream1 = new Transform();
var soFar = '';
stream1._transform = function(chunk, encoding, done) {
  chunks1.push(chunk.toString());
  var pieces = (soFar + chunk).split('\n');
  soFar = pieces.pop();
  for (var i = 0; i < pieces.length; i++) {
    var piece = pieces[i];
    this.push(piece);
  }
  return done();
};

var chunks2 = [];
var count = 0;
var stream2 = new Transform();
stream2._transform = function(chunk, encoding, done) {
  chunks2.push(chunk.toString());
  count = count + 1;
  this.push(count + ' ' + chunk.toString() + '\n');
  done();
};

var stdin = process.stdin;
var stdout = process.stdout;

process.on('exit', function () {
    console.error('chunks1: ' + JSON.stringify(chunks1));
    console.error('chunks2: ' + JSON.stringify(chunks2));
});
process.stdout.on('error', process.exit);


// stdin.pipe(stream1).pipe(stream2).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

// Best working solution I could find
var stream3 = function(src) {
  return src.pipe(stream1).pipe(stream2);
};
stream3(stdin).pipe(stdout);

// $ (printf "abc\nd"; sleep 1; printf "ef\nghi\n") | node streams-combine.js
// Outputs:
// 1 abc
// 2 def
// 3 ghi
// chunks1: ["abc\nd","ef\nghi\n"]
// chunks2: ["abc","def","ghi"]

这有可能吗?如果不清楚我想要做什么,请告诉我。

谢谢!

【问题讨论】:

    标签: node.js node.js-stream


    【解决方案1】:

    您可以观察要传送到您的流的内容,然后unpipe 并将其传送到您感兴趣的流中:

    var PassThrough = require('stream').PassThrough;
    
    var stream3 = new PassThrough();
    
    // When a source stream is piped to us, undo that pipe, and save
    // off the source stream piped into our internally managed streams.
    stream3.on('pipe', function(source) {
      source.unpipe(this);
      this.transformStream = source.pipe(stream1).pipe(stream2);
    });
    
    // When we're piped to another stream, instead pipe our internal
    // transform stream to that destination.
    stream3.pipe = function(destination, options) {
      return this.transformStream.pipe(destination, options);
    };
    
    stdin.pipe(stream3).pipe(stdout);
    

    您可以将此功能提取到您自己的可构造流类中:

    var util = require('util');
    var PassThrough = require('stream').PassThrough;
    
    var StreamCombiner = function() {
      this.streams = Array.prototype.slice.apply(arguments);
    
      this.on('pipe', function(source) {
        source.unpipe(this);
        for(i in this.streams) {
          source = source.pipe(this.streams[i]);
        }
        this.transformStream = source;
      });
    };
    
    util.inherits(StreamCombiner, PassThrough);
    
    StreamCombiner.prototype.pipe = function(dest, options) {
      return this.transformStream.pipe(dest, options);
    };
    
    var stream3 = new StreamCombiner(stream1, stream2);
    stdin.pipe(stream3).pipe(stdout);
    

    【讨论】:

    • 非常感谢@brandon,这太棒了!更新了我的要点gist.github.com/nicolashery/5910969
    • 这太棒了。我正在考虑做类似的事情,但我只是没有信心我没有错过一些会使我的解决方案出错的微妙之处。感谢您的信任
    • FWIW,要使此解决方案正常工作,您需要先将 stream3 传输到源(在本例中为标准输入),然后再将其传输到标准输出。所以,没有 stream3.pipe(stdout); stream3.write(数据);但这是一个很大的帮助!谢谢!
    • 原来stream3是一个转换流,所以它没有写方法,反正。
    • 有没有提供该功能的库?
    【解决方案2】:

    一种选择可能是使用multipipe,它允许您将多个转换链接在一起,包装为单个转换流:

    // my-stream.js
    var multipipe = require('multipipe');
    
    module.exports = function createMyStream() {
      return multipipe(vendorStream, myInternalStream);
    };
    

    那么你可以这样做:

    var createMyStream = require('./my-stream');
    
    var myStream = createMyStream();
    
    process.stdin.pipe(myStream).pipe(process.stdout);
    

    澄清:这使得标准输入通过vendorStream,然后是myInternalStream,最后是stdout。

    【讨论】:

    猜你喜欢
    • 2015-09-23
    • 2014-10-09
    • 2023-04-06
    • 1970-01-01
    • 1970-01-01
    • 2015-02-10
    • 2016-06-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多