【发布时间】:2011-09-27 09:07:11
【问题描述】:
我有兴趣了解实现基于线程的队列的最佳方式。
例如:
我有 10 个动作,我只想用 4 个线程来执行。我想创建一个所有 10 个动作线性放置的队列,并用 4 个线程启动前 4 个动作,一旦一个线程完成执行,下一个线程将开始等等 - 所以一次,线程数是4 或小于 4。
【问题讨论】:
标签: ruby multithreading synchronization queue
我有兴趣了解实现基于线程的队列的最佳方式。
例如:
我有 10 个动作,我只想用 4 个线程来执行。我想创建一个所有 10 个动作线性放置的队列,并用 4 个线程启动前 4 个动作,一旦一个线程完成执行,下一个线程将开始等等 - 所以一次,线程数是4 或小于 4。
【问题讨论】:
标签: ruby multithreading synchronization queue
在标准库的thread 中有一个Queue 类。使用它,您可以执行以下操作:
require 'thread'
queue = Queue.new
threads = []
# add work to the queue
queue << work_unit
4.times do
threads << Thread.new do
# loop until there are no more things to do
until queue.empty?
# pop with the non-blocking flag set, this raises
# an exception if the queue is empty, in which case
# work_unit will be set to nil
work_unit = queue.pop(true) rescue nil
if work_unit
# do work
end
end
# when there is no more work, the thread will stop
end
end
# wait until all threads have completed processing
threads.each { |t| t.join }
我使用非阻塞标志弹出的原因是在 until queue.empty? 和 pop 之间另一个线程可能已经弹出队列,所以除非设置了非阻塞标志,否则我们可能会永远卡在该行.
如果您使用 MRI(默认的 Ruby 解释器),请记住线程不会绝对并发。如果您的工作受 CPU 限制,您也可以运行单线程。如果你有一些阻塞 IO 的操作,你可能会得到一些并行性,但是 YMMV。或者,您可以使用允许完全并发的解释器,例如 jRuby 或 Rubinius。
【讨论】:
:END_OF_WORK work_units 而不是非阻塞弹出。此外,您关于没有 CPU 并发运行的线程的最后陈述适用于 YARV,但不适用于 JRuby。
有一些 gem 可以为您实现这种模式; parallel,peach,and mine 被称为threach(或在 jruby 下为jruby_threach)。它是#each 的直接替代品,但允许您指定运行多少线程,使用下面的 SizedQueue 来防止事情失控。
所以...
(1..10).threach(4) {|i| do_my_work(i) }
不推自己的东西;有很多好的实现可以让事情变得更容易。
如果您使用的是 JRuby,jruby_threach 是一个更好的实现——Java 只是提供了一组更丰富的线程原语和数据结构供使用。
【讨论】:
可执行的描述性示例:
require 'thread'
p tasks = [
{:file => 'task1'},
{:file => 'task2'},
{:file => 'task3'},
{:file => 'task4'},
{:file => 'task5'}
]
tasks_queue = Queue.new
tasks.each {|task| tasks_queue << task}
# run workers
workers_count = 3
workers = []
workers_count.times do |n|
workers << Thread.new(n+1) do |my_n|
while (task = tasks_queue.shift(true) rescue nil) do
delay = rand(0)
sleep delay
task[:result] = "done by worker ##{my_n} (in #{delay})"
p task
end
end
end
# wait for all threads
workers.each(&:join)
# output results
puts "all done"
p tasks
【讨论】:
您可以使用线程池。对于此类问题,这是一种相当常见的模式。
http://en.wikipedia.org/wiki/Thread_pool_pattern
Github 似乎有一些你可以尝试的实现:
https://github.com/search?type=Everything&language=Ruby&q=thread+pool
【讨论】:
Celluloid 有一个 worker pool example 可以做到这一点。
【讨论】:
我使用了一个名为 work_queue 的 gem。真的很实用。
例子:
require 'work_queue'
wq = WorkQueue.new 4, 10
(1..10).each do |number|
wq.enqueue_b("Thread#{number}") do |thread_name|
puts "Hello from the #{thread_name}"
end
end
wq.join
【讨论】: