【问题标题】:Mutex locks in Ruby do not work with Redis?Ruby 中的互斥锁不能与 Redis 一起使用?
【发布时间】:2018-04-14 13:33:39
【问题描述】:

我有批量导入的需求。文件可以包含 1000 条记录,每条记录都需要验证。用户希望被告知有多少记录无效。最初我是用 Ruby 的 Mutex 和 Redis 的 Publish/Subscribe 来做的。请注意,我有 20 个并发线程通过 Sidekiq 处理每条记录:

class Record < ActiveRecord::Base
  class << self
    # invalidated_records is SHARED memory for the Sidekiq worker threads
    attr_accessor :invalidated_records
    attr_accessor :semaphore
  end

  def self.batch_import
  self.semaphore = Mutex.new  
  self.invalid_records = []    
  redis.subscribe_with_timeout(180, 'validation_update') do |on|
    on.message do |channel, message|
      if message.to_s =~ /\d+|import_.+/
        self.semaphore.synchronize {
          self.invalidated_records << message
        }  
      elsif message == 'exit'
        redis.unsubscribe
      end
    end
  end
  end
end

Sidekiq 将发布到 Record 对象:

Redis.current.publish 'validation_update', 'import_invalid_address'

问题是发生了一些奇怪的事情。 Record.invalidated_records 中不会填充所有无效导入。其中许多是但不是全部。我认为这是因为多个线程尝试同时更新对象,它污染了对象。我认为互斥锁可以解决这个问题。但是仍然在添加 Mutex 锁之后,Record.invalidated_records 中并没有填充所有的无效值。

最终,我使用 redis 原子递减和递增来跟踪无效导入,这就像一个魅力。但我很好奇 Ruby Mutex 和多个线程尝试更新 Record.invalidated_records 有什么问题?

【问题讨论】:

    标签: ruby-on-rails ruby redis mutex sidekiq


    【解决方案1】:

    我没有使用互斥锁,但我认为线程看到信号量被锁定并跳过保存 https://apidock.com/ruby/ConditionVariable 等待互斥锁解锁,然后保存数据

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2012-08-06
      • 2010-09-16
      • 1970-01-01
      • 2013-07-24
      • 2020-04-29
      • 2016-05-19
      • 2021-03-09
      相关资源
      最近更新 更多