【发布时间】:2018-04-14 13:33:39
【问题描述】:
我有批量导入的需求。文件可以包含 1000 条记录,每条记录都需要验证。用户希望被告知有多少记录无效。最初我是用 Ruby 的 Mutex 和 Redis 的 Publish/Subscribe 来做的。请注意,我有 20 个并发线程通过 Sidekiq 处理每条记录:
class Record < ActiveRecord::Base
class << self
# invalidated_records is SHARED memory for the Sidekiq worker threads
attr_accessor :invalidated_records
attr_accessor :semaphore
end
def self.batch_import
self.semaphore = Mutex.new
self.invalid_records = []
redis.subscribe_with_timeout(180, 'validation_update') do |on|
on.message do |channel, message|
if message.to_s =~ /\d+|import_.+/
self.semaphore.synchronize {
self.invalidated_records << message
}
elsif message == 'exit'
redis.unsubscribe
end
end
end
end
end
Sidekiq 将发布到 Record 对象:
Redis.current.publish 'validation_update', 'import_invalid_address'
问题是发生了一些奇怪的事情。 Record.invalidated_records 中不会填充所有无效导入。其中许多是但不是全部。我认为这是因为多个线程尝试同时更新对象,它污染了对象。我认为互斥锁可以解决这个问题。但是仍然在添加 Mutex 锁之后,Record.invalidated_records 中并没有填充所有的无效值。
最终,我使用 redis 原子递减和递增来跟踪无效导入,这就像一个魅力。但我很好奇 Ruby Mutex 和多个线程尝试更新 Record.invalidated_records 有什么问题?
【问题讨论】:
标签: ruby-on-rails ruby redis mutex sidekiq