您可以使用thread pool。这是一个非常基本的使用Queue:
queue = Queue.new
pool = Array.new(5) do
Thread.new do
loop do
line = queue.pop
break if line == :stop
# do something with line, e.g.
# id, number = line.split('|')
# Utils.req(...)
end
end
end
它创建 5 个线程并将它们存储在一个数组中以供以后参考。每个线程运行一个循环,调用Queue#pop 从队列中取出一行。如果队列为空,pop 将暂停线程,直到数据可用。所以一开始,线程只会坐在那里等待工作的到来:
queue.num_waiting #=> 5
pool.map(&:status) #=> ["sleep", "sleep", "sleep", "sleep", "sleep"]
一旦线程检索到一行,它将处理它(待实现)并获取一个新行(如果没有,则再次进入休眠状态)。如果该行恰好是符号:stop,则线程将中断循环并终止。 (非常务实的做法)
为了填充队列,我们可以在主线程中使用一个简单的循环:
file.each_line { |line| queue << line }
之后,我们将 5 个:stop 符号推送到队列末尾,然后等待线程完成:
pool.each { queue << :stop }
pool.each(&:join)
此时,没有线程在等待队列——它们都正常终止:
queue.num_waiting #=> 0
pool.map(&:status) #=> [false, false, false, false, false]
请注意,Queue 不限于字符串或符号。您可以将任何 Ruby 对象推送给它。因此,您可以推送预处理数据,而不是推送原始行:
file.each_line do |line|
id, number = line.split('|')
queue << [id, number]
end
这样,您的线程不必知道日志格式。他们可以在id 和number 上工作:
loop do
value = queue.pop
break if value == :stop
id, number = value
# do something with id and number
end
您当然可以使用各种并发 gem 之一,而不是编写自己的线程池。