【问题标题】:Is Ruby pipe streaming asynchronous?Ruby 管道流是异步的吗?
【发布时间】:2015-06-03 21:00:07
【问题描述】:

我有一个map/reducepipeline 用Ruby 编写,它的行为很奇怪。管道如下所示:

mapper | sort | reducer | expander | sort | splitter | uploader

mapper 写入 STDOUT(通过 puts),reducer 从 STDIN 读取(通过 ARGF.each)并写入 STDOUT(通过 puts)等等。

似乎在执行上传程序时,拆分器应该创建的文件尚未创建。所以上传者不会上传任何东西。

这是我的管道类:

class Pipeline

  def run(context)
    raise ArgumentError, 'context is nil' unless context
    raise ArgumentError, 'context[:logger] is nil' unless context[:logger]

    current_path = File.dirname(__FILE__)
    mapper       = File.join(current_path, 'mapper.rb')
    reducer      = File.join(current_path, 'reducer.rb')
    expander     = File.join(current_path, 'expander.rb')
    splitter     = File.join(current_path, 'splitter.rb')
    uploader     = File.join(current_path, 'uploader.rb')

    mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}"

    command_line = "ruby #{mapper}#{mapper_args} | sort | ruby #{reducer} | ruby #{expander} | sort | ruby #{splitter} | ruby #{uploader}"

    context[:logger].debug command_line

    %x{#{command_line}}
  end

end

如果管道流在 Ruby 中是异步的,我想知道 RubyMine 所做的是否可以解决此问题。例如,在运行 Ruby 脚本之前,他们会在命令行前添加如下所示:ruby -e $stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift)

我已经用这种技术更新了我的代码,但是,我想知道这是否正确。或者,是否有更好的方法?

class Pipeline

  def run(context)
    raise ArgumentError, 'context is nil' unless context
    raise ArgumentError, 'context[:logger] is nil' unless context[:logger]

    current_path = File.dirname(__FILE__)
    ruby         = 'ruby -e $stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift)'
    mapper       = File.join(current_path, 'mapper.rb')
    reducer      = File.join(current_path, 'reducer.rb')
    expander     = File.join(current_path, 'expander.rb')
    splitter     = File.join(current_path, 'splitter.rb')
    uploader     = File.join(current_path, 'uploader.rb')

    mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}"

    create_reports_command_line = "#{ruby} #{mapper}#{mapper_args} | sort | #{ruby} #{reducer} | #{ruby} #{expander} | sort | #{ruby} #{splitter}"

    context[:logger].debug create_reports_command_line

    %x{#{create_reports_command_line}}

    sleep 60 # Sleep for 1 min, just in case...

    upload_reports_command_line = "#{ruby} #{uploader}"

    context[:logger].debug upload_reports_command_line

    %x{#{upload_reports_command_line}}
  end

end

【问题讨论】:

    标签: ruby streaming pipe


    【解决方案1】:

    您需要启用sync。这告诉 Ruby 缓冲输出,而是将其作为输出发送。

    将“同步模式”设置为 true 或 false。当同步模式为真时,所有输出都会立即刷新到底层操作系统,并且不会在内部缓冲。

    管道应该与 sync disabled/false 一起使用,但是在第一个管道,然后是后续管道,看到一个关闭的输入,或者它们有完整的缓冲区并刷新它们之前,你不会看到任何东西,这可能需要一段时间。

    更多信息请参见IO.sync=

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2013-07-11
      • 2014-12-05
      • 2012-01-19
      • 2016-12-20
      • 2016-06-01
      • 2023-03-29
      • 2016-12-15
      • 1970-01-01
      相关资源
      最近更新 更多