【发布时间】:2022-08-15 08:20:59
【问题描述】:
我的 rails 应用程序使用 sidekiq 运行。该应用程序拥有多个帐户。
每个帐户都可以运行ImportResourceJob,它发送account_id 作为参数来识别要使用的正确帐户。
我想防止为同一帐户同时启动多个 ImportResourceJobs。
基本上,我想在启动新的 ImportResourceJob 之前检查该特定 account_id 的当前队列/运行 ImportResourceJob。
我有点不确定如何做到这一点。我已经看到建议使用 sidekiq api https://github.com/mperham/sidekiq/wiki/API#scan 或 https://github.com/mperham/sidekiq/wiki/API#workers 中的扫描方法的答案
workers = Sidekiq::Workers.new
workers.size # => 2
workers.each do |process_id, thread_id, work|
# process_id is a unique identifier per Sidekiq process
# thread_id is a unique identifier per thread
# work is a Hash which looks like:
# { \'queue\' => name, \'run_at\' => timestamp, \'payload\' => msg }
# run_at is an epoch Integer.
# payload is a Hash which looks like:
# { \'retry\' => true,
# \'queue\' => \'default\',
# \'class\' => \'Redacted\',
# \'args\' => [1, 2, \'foo\'],
# \'jid\' => \'80b1e7e46381a20c0c567285\',
# \'enqueued_at\' => 1427811033.2067106 }
end
这似乎不是非常精确或可实现的(仅每 5 秒更新一次)。 如果你有很多工人,在我看来也是不可扩展的。
有一个 Jobs 表是否常见/良好的做法:
- column
account_id= 帐户 has_many Jobs - 列
type= 作业类别(例如:ImportResourceJob) - 列
status=enqueud,running,finished,failed
处理那些事情?这个想法是在启动作业之前在 Jobs 表中创建一个条目,并将 job_id 传递给 Job。像这样的东西:
def launches_import_resource_job
existing_running_job = Job.find_by(type: \"ImportResourceJob\", account_id: account_id, status: [\"enqueued\", \"running\"])
return if existing_running_job
job = Job.create(
type: \"ImportResourceJob\",
account_id: account_id,
status: \"enqueued\"
)
ImportLmsResourcesJob.perform_later(
account_id,
job.id
)
end
然后在 ImportResourcesJob 本身:
class ImportResourcesJob < ApplicationJob
queue_as :default
sidekiq_options retry: false
def perform(account_id, job_id)
job = Job.find(job_id)
job.update(status: \"running\")
Sync360Service.call(account_id)
job.update(status: \"finished\")
rescue Exception => e
job.update(status: \"failed\")
raise e
end
end
解决此问题的公认/好的解决方案是什么?
-
有一个表来跟踪排队的作业是一个可以接受的解决方案,这取决于您的架构以及数据库负载和延迟的轻微增加是否可以接受(在大多数情况下是可以接受的)。
标签: ruby-on-rails jobs sidekiq worker