【问题标题】:Deadlock in ThreadPool线程池死锁
【发布时间】:2010-09-10 01:18:47
【问题描述】:

我找不到适合 Ruby 的 ThreadPool 实现,所以我编写了我的(部分基于此处的代码:http://web.archive.org/web/20081204101031/http://snippets.dzone.com:80/posts/show/3276,但更改为等待/信号和其他实现 ThreadPool 关闭。但是经过一段时间的运行(有 100 个线程并处理大约 1300 个任务),它在第 25 行死于死锁 - 它在那里等待新工作。有什么想法,为什么会发生?

require 'thread'
begin
  require 'fastthread'
rescue LoadError
  $stderr.puts "Using the ruby-core thread implementation"
end 

class ThreadPool
  class Worker
    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        while @mutex.synchronize {@running}
          block = get_block
          if block
            block.call
            reset_block
            # Signal the ThreadPool that this worker is ready for another job
            @callback.signal
          else
            # Wait for a new job
            @mutex.synchronize {@cv.wait(@mutex)} # <=== Is this line 25?
          end
        end
      end
    end

    def name
      @thread.inspect
    end

    def get_block
      @mutex.synchronize {@block}
    end

    def set_block(block)
      @mutex.synchronize do
        raise RuntimeError, "Thread already busy." if @block
        @block = block
        # Signal the thread in this class, that there's a job to be done
        @cv.signal
      end
    end

    def reset_block
      @mutex.synchronize {@block = nil}
    end

    def busy?
      @mutex.synchronize {!@block.nil?}
    end

    def stop
      @mutex.synchronize {@running = false}
      # Signal the thread not to wait for a new job
      @cv.signal
      @thread.join
    end
  end

  attr_accessor :max_size

  def initialize(max_size = 10)
    @max_size = max_size
    @workers = []
    @mutex = Mutex.new
    @cv = ConditionVariable.new
  end

  def size
    @mutex.synchronize {@workers.size}
  end

  def busy?
    @mutex.synchronize {@workers.any? {|w| w.busy?}}
  end

  def shutdown
    @mutex.synchronize {@workers.each {|w| w.stop}}
  end
  alias :join :shutdown

  def process(block=nil,&blk)
    block = blk if block_given?
    while true
      @mutex.synchronize do
         worker = get_worker 
         if worker
           return worker.set_block(block)
         else
           # Wait for a free worker
           @cv.wait(@mutex)
         end
      end
    end
  end

  # Used by workers to report ready status
  def signal
    @cv.signal
  end

  private
  def get_worker
    free_worker || create_worker
  end

  def free_worker
    @workers.each {|w| return w unless w.busy?}; nil
  end

  def create_worker
    return nil if @workers.size >= @max_size
    worker = Worker.new(self)
    @workers << worker
    worker
  end
end

【问题讨论】:

    标签: ruby multithreading threadpool deadlock


    【解决方案1】:

    我在这里略有偏见,但我建议用一些流程语言对此进行建模并进行模型检查。免费提供的工具有,例如 mCRL2 工具集(使用基于 ACP 的语言)、Mobility Workbench(pi-calculus)和 Spin (PROMELA)。

    否则,我建议删除对问题而言并非必不可少的所有代码,并找到发生死锁的最小情况。我怀疑 100 个线程和 1300 个任务对于获得死锁是必不可少的。对于较小的情况,您可能只需添加一些调试打印,它们提供了足够的信息来解决问题。

    【讨论】:

    • 有问题的代码仅处理 180000 个任务中的 1300 个任务失败,不幸的是,无法用更小的集合重现它...
    【解决方案2】:

    好的,问题似乎出在您的 ThreadPool#signal 方法中。可能发生的情况是:

    1 - 您的所有员工都很忙,您尝试处理新工作

    2 - 第 90 行得到一个 nil 工人

    3 - 一个工人被释放并发出信号,但由于线程池没有等待它,所以信号丢失了

    4 - 你掉在第 95 行,即使有空闲的工人也在等待。

    这里的错误是即使没有人在听,您也可以向空闲的工作人员发出信号。这个 ThreadPool#signal 方法应该是:

    def signal
         @mutex.synchronize { @cv.signal }
    end
    

    Worker 对象中的问题也是一样的。可能发生的情况是:

    1 - 工人刚刚完成了一项工作

    2 - 检查(第 17 行)是否有作业在等待:没有

    3 - 线程池发送一个新作业并发出信号......但是信号丢失了

    4 - 工作人员等待信号,即使它被标记为忙

    你应该把你的初始化方法写成:

    def initialize(callback)
      @mutex = Mutex.new
      @cv = ConditionVariable.new
      @callback = callback
      @mutex.synchronize {@running = true}
      @thread = Thread.new do
        @mutex.synchronize do
          while @running
            block = get_block
            if block
              @mutex.unlock
              block.call
              @mutex.lock
              reset_block
              # Signal the ThreadPool that this worker is ready for another job
              @callback.signal
            else
              # Wait for a new job
              @cv.wait(@mutex)
            end
          end
        end
      end
    end
    

    接下来,Worker#get_block 和 Worker#reset_block 方法不应再同步。这样一来,您就不能在测试块和等待信号之间将块分配给工作人员。

    【讨论】:

    • 我认为你是对的!我马上测试一下,谢谢!
    • 嗯.. 现在在等待线程完成时出现死锁(例如,为 ThreadPool 调用 join)。我正试图找出原因。
    【解决方案3】:

    好的,所以实现的主要问题是:如何确保没有信号丢失并避免死锁?

    根据我的经验,使用条件变量和互斥锁很难实现这一点,但使用信号量则很容易。碰巧 ruby​​ 实现了一个名为 Queue(或 SizedQueue)的对象来解决这个问题。这是我建议的实现:

    require 'thread'
    begin
      require 'fasttread'
    rescue LoadError
      $stderr.puts "Using the ruby-core thread implementation"
    end
    
    class ThreadPool
      class Worker
        def initialize(thread_queue)
          @mutex = Mutex.new
          @cv = ConditionVariable.new
          @queue = thread_queue
          @running = true
          @thread = Thread.new do
            @mutex.synchronize do
              while @running
                @cv.wait(@mutex)
                block = get_block
                if block
                  @mutex.unlock
                  block.call
                  @mutex.lock
                  reset_block
                end
                @queue << self
              end
            end
          end
        end
    
        def name
          @thread.inspect
        end
    
        def get_block
          @block
        end
    
        def set_block(block)
          @mutex.synchronize do
            raise RuntimeError, "Thread already busy." if @block
            @block = block
            # Signal the thread in this class, that there's a job to be done
            @cv.signal
          end
        end
    
        def reset_block
          @block = nil
        end
    
        def busy?
          @mutex.synchronize { !@block.nil? }
        end
    
        def stop
          @mutex.synchronize do
            @running = false
            @cv.signal
          end
          @thread.join
        end
      end
    
      attr_accessor :max_size
    
      def initialize(max_size = 10)
        @max_size = max_size
        @queue = Queue.new
        @workers = []
      end
    
      def size
        @workers.size
      end
    
      def busy?
        @queue.size < @workers.size
      end
    
      def shutdown
        @workers.each { |w| w.stop }
        @workers = []
      end
    
      alias :join :shutdown
    
      def process(block=nil,&blk)
        block = blk if block_given?
        worker = get_worker
        worker.set_block(block)
      end
    
      private
    
      def get_worker
        if !@queue.empty? or @workers.size == @max_size
          return @queue.pop
        else
          worker = Worker.new(@queue)
          @workers << worker
          worker
        end
      end
    
    end
    

    这是一个简单的测试代码:

    tp = ThreadPool.new 500
    (1..1000).each { |i| tp.process { (2..10).inject(1) { |memo,val| sleep(0.1); memo*val }; print "Computation #{i} done. Nb of tasks: #{tp.size}\n" } }
    tp.shutdown
    

    【讨论】:

    • 1.不应该同步对@workers 的访问吗? 2. 为什么工作线程中还需要加锁和解锁?
    • 对worker的访问总是从同一个线程完成的......所以不需要同步。至于工作线程中的锁,需要他们安全地唤醒线程。
    • 这仍然存在问题——有可能出现死锁——当工作线程将自己添加到队列中时,ThreadPool 可以从队列中取出它并分配任务。在这种情况下,将发送一个信号。但是,如果工作线程没有等待 cv,则信号将丢失。
    • 罗曼,不,它不能!为了发送信号,线程池需要获取互斥体......在对象等待信号之前,它会被阻塞。
    • 这在 MRI 1.9 下并不令人满意:“thread.rb:185:in `sleep': deadlock detected (fatal)” :(
    【解决方案4】:

    您可以尝试work_queue gem,它旨在协调生产者和工作线程池之间的工作。

    【讨论】:

      【解决方案5】:

      多年来,顶级评论者的代码提供了很多帮助。在这里,它针对 ruby​​ 2.x 进行了更新,并通过线程识别进行了改进。那是如何改进的?当每个线程都有一个 ID 时,您可以将 ThreadPool 与一个存储任意信息的数组组成。一些想法:

      • 无数组:典型的 ThreadPool 用法。即使使用 GIL,它也可以让线程变得很容易编码,并且对于高延迟应用程序(例如大容量网络爬虫)非常有用,
      • 根据 CPU 数量调整线程池和数组:易于 fork 进程以使用所有 CPU,
      • 根据资源数量调整线程池和数组:例如,每个数组元素代表实例池中的一个处理器,因此如果您有 10 个实例,每个实例有 4 个 CPU,TP 可以跨 40 个子进程管理工作。

      对于最后两个,与其考虑线程做工作,不如考虑 ThreadPool 管理正在做工作的子进程。管理任务是轻量级的,当与子流程结合时,谁关心 GIL。

      使用这个类,您可以用大约一百行代码编写一个基于集群的 MapReduce!这段代码非常简短,尽管要完全理解它可能有点费力。希望对您有所帮助。

      # Usage:
      #
      #   Thread.abort_on_exception = true # help localize errors while debugging
      #   pool = ThreadPool.new(thread_pool_size)
      #   50.times {|i|
      #     pool.process { ... }
      #     or
      #     pool.process {|id| ... } # worker identifies itself as id
      #   }
      #   pool.shutdown()
      
      class ThreadPool
      
        require 'thread'
      
        class ThreadPoolWorker
      
          attr_accessor :id
      
          def initialize(thread_queue, id)
            @id = id # worker id is exposed thru tp.process {|id| ... }
            @mutex = Mutex.new
            @cv = ConditionVariable.new
            @idle_queue = thread_queue
            @running = true
            @block = nil
            @thread = Thread.new {
              @mutex.synchronize {
                while @running
                  @cv.wait(@mutex) # block until there is work to do
                  if @block
                    @mutex.unlock
                    begin
                      @block.call(@id)
                    ensure
                      @mutex.lock
                    end
                    @block = nil
                  end
                  @idle_queue << self
                end
              }
            }
          end
      
          def set_block(block)
            @mutex.synchronize {
              raise RuntimeError, "Thread is busy." if @block
              @block = block
              @cv.signal # notify thread in this class, there is work to be done
            }
          end
      
          def busy?
            @mutex.synchronize { ! @block.nil? }
          end
      
          def stop
            @mutex.synchronize {
              @running = false
              @cv.signal
            }
            @thread.join
          end
      
          def name
            @thread.inspect
          end
        end
      
      
        attr_accessor :max_size, :queue
      
        def initialize(max_size = 10)
          @process_mutex = Mutex.new
          @max_size = max_size
          @queue = Queue.new # of idle workers
          @workers = []      # array to hold workers
      
          # construct workers
          @max_size.times {|i| @workers << ThreadPoolWorker.new(@queue, i) }
      
          # queue up workers (workers in queue are idle and available to
          # work).  queue blocks if no workers are available.
          @max_size.times {|i| @queue << @workers[i] }
      
          sleep 1 # important to give threads a chance to initialize
        end
      
        def size
          @workers.size
        end
      
        def idle
          @queue.size
        end
      
        # are any threads idle
      
        def busy?
          # @queue.size < @workers.size
          @queue.size == 0 && @workers.size == @max_size
        end
      
        # block until all threads finish
      
        def shutdown
          @workers.each {|w| w.stop }
          @workers = []
        end
      
        alias :join :shutdown
      
        def process(block = nil, &blk)
          @process_mutex.synchronize {
            block = blk if block_given?
            worker = @queue.pop # assign to next worker; block until one is ready
            worker.set_block(block) # give code block to worker and tell it to start
          }
        end
      
      
      end
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-06-16
        • 2016-04-08
        • 1970-01-01
        相关资源
        最近更新 更多