【发布时间】:2015-06-03 21:00:07
【问题描述】:
我有一个map/reduce 流pipeline 用Ruby 编写,它的行为很奇怪。管道如下所示:
mapper | sort | reducer | expander | sort | splitter | uploader
mapper 写入 STDOUT(通过 puts),reducer 从 STDIN 读取(通过 ARGF.each)并写入 STDOUT(通过 puts)等等。
似乎在执行上传程序时,拆分器应该创建的文件尚未创建。所以上传者不会上传任何东西。
这是我的管道类:
class Pipeline
def run(context)
raise ArgumentError, 'context is nil' unless context
raise ArgumentError, 'context[:logger] is nil' unless context[:logger]
current_path = File.dirname(__FILE__)
mapper = File.join(current_path, 'mapper.rb')
reducer = File.join(current_path, 'reducer.rb')
expander = File.join(current_path, 'expander.rb')
splitter = File.join(current_path, 'splitter.rb')
uploader = File.join(current_path, 'uploader.rb')
mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}"
command_line = "ruby #{mapper}#{mapper_args} | sort | ruby #{reducer} | ruby #{expander} | sort | ruby #{splitter} | ruby #{uploader}"
context[:logger].debug command_line
%x{#{command_line}}
end
end
如果管道流在 Ruby 中是异步的,我想知道 RubyMine 所做的是否可以解决此问题。例如,在运行 Ruby 脚本之前,他们会在命令行前添加如下所示:ruby -e $stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift)。
我已经用这种技术更新了我的代码,但是,我想知道这是否正确。或者,是否有更好的方法?
class Pipeline
def run(context)
raise ArgumentError, 'context is nil' unless context
raise ArgumentError, 'context[:logger] is nil' unless context[:logger]
current_path = File.dirname(__FILE__)
ruby = 'ruby -e $stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift)'
mapper = File.join(current_path, 'mapper.rb')
reducer = File.join(current_path, 'reducer.rb')
expander = File.join(current_path, 'expander.rb')
splitter = File.join(current_path, 'splitter.rb')
uploader = File.join(current_path, 'uploader.rb')
mapper_args = context[:order_id] == nil ? nil : " #{context[:order_id]}"
create_reports_command_line = "#{ruby} #{mapper}#{mapper_args} | sort | #{ruby} #{reducer} | #{ruby} #{expander} | sort | #{ruby} #{splitter}"
context[:logger].debug create_reports_command_line
%x{#{create_reports_command_line}}
sleep 60 # Sleep for 1 min, just in case...
upload_reports_command_line = "#{ruby} #{uploader}"
context[:logger].debug upload_reports_command_line
%x{#{upload_reports_command_line}}
end
end
【问题讨论】: