【问题标题】:Thread and Queue线程和队列
【发布时间】:2011-09-27 09:07:11
【问题描述】:

我有兴趣了解实现基于线程的队列的最佳方式。

例如:

我有 10 个动作,我只想用 4 个线程来执行。我想创建一个所有 10 个动作线性放置的队列,并用 4 个线程启动前 4 个动作,一旦一个线程完成执行,下一个线程将开始等等 - 所以一次,线程数是4 或小于 4。

【问题讨论】:

    标签: ruby multithreading synchronization queue


    【解决方案1】:

    在标准库的thread 中有一个Queue 类。使用它,您可以执行以下操作:

    require 'thread'
    
    queue = Queue.new
    threads = []
    
    # add work to the queue
    queue << work_unit
    
    4.times do
      threads << Thread.new do
        # loop until there are no more things to do
        until queue.empty?
          # pop with the non-blocking flag set, this raises
          # an exception if the queue is empty, in which case
          # work_unit will be set to nil
          work_unit = queue.pop(true) rescue nil
          if work_unit
            # do work
          end
        end
        # when there is no more work, the thread will stop
      end
    end
    
    # wait until all threads have completed processing
    threads.each { |t| t.join }
    

    我使用非阻塞标志弹出的原因是在 until queue.empty? 和 pop 之间另一个线程可能已经弹出队列,所以除非设置了非阻塞标志,否则我们可能会永远卡在该行.

    如果您使用 MRI(默认的 Ruby 解释器),请记住线程不会绝对并发。如果您的工作受 CPU 限制,您也可以运行单线程。如果你有一些阻塞 IO 的操作,你可能会得到一些并行性,但是 YMMV。或者,您可以使用允许完全并发的解释器,例如 jRuby 或 Rubinius。

    【讨论】:

    • 在镐中,建议使用 4 个 :END_OF_WORK work_units 而不是非阻塞弹出。此外,您关于没有 CPU 并发运行的线程的最后陈述适用于 YARV,但不适用于 JRuby。
    • @AndrewGrimm,我喜欢这个答案,因为有时您希望有一个工作队列和线程在添加新工作项时徘徊以完成工作。
    【解决方案2】:

    有一些 gem 可以为您实现这种模式; parallel,peach,and mine 被称为threach(或在 jruby 下为jruby_threach)。它是#each 的直接替代品,但允许您指定运行多少线程,使用下面的 SizedQueue 来防止事情失控。

    所以...

    (1..10).threach(4) {|i| do_my_work(i) }
    

    不推自己的东西;有很多好的实现可以让事情变得更容易。

    如果您使用的是 JRuby,jruby_threach 是一个更好的实现——Java 只是提供了一组更丰富的线程原语和数据结构供使用。

    【讨论】:

      【解决方案3】:

      可执行的描述性示例:

      require 'thread'
      
      p tasks = [
          {:file => 'task1'},
          {:file => 'task2'},
          {:file => 'task3'},
          {:file => 'task4'},
          {:file => 'task5'}
      ]
      
      tasks_queue = Queue.new
      tasks.each {|task| tasks_queue << task}
      
      # run workers
      workers_count = 3
      workers = []
      workers_count.times do |n|
          workers << Thread.new(n+1) do |my_n|
              while (task = tasks_queue.shift(true) rescue nil) do
                  delay = rand(0)
                  sleep delay
                  task[:result] = "done by worker ##{my_n} (in #{delay})"
                  p task
              end
          end
      end
      
      # wait for all threads
      workers.each(&:join)
      
      # output results
      puts "all done"
      p tasks
      

      【讨论】:

        【解决方案4】:

        您可以使用线程池。对于此类问题,这是一种相当常见的模式。
        http://en.wikipedia.org/wiki/Thread_pool_pattern

        Github 似乎有一些你可以尝试的实现:
        https://github.com/search?type=Everything&language=Ruby&q=thread+pool

        【讨论】:

          【解决方案5】:

          Celluloid 有一个 worker pool example 可以做到这一点。

          【讨论】:

            【解决方案6】:

            我使用了一个名为 work_queue 的 gem。真的很实用。

            例子:

            require 'work_queue'
            wq = WorkQueue.new 4, 10
            (1..10).each do |number|
                wq.enqueue_b("Thread#{number}") do |thread_name|  
                    puts "Hello from the #{thread_name}"
                end
            end
            wq.join
            

            【讨论】:

              猜你喜欢
              • 2010-10-22
              • 1970-01-01
              • 2013-11-15
              • 1970-01-01
              • 1970-01-01
              • 2013-06-18
              • 2013-02-16
              • 2010-10-28
              相关资源
              最近更新 更多