【问题标题】:Rails/Sidekiq prevent launching new job if same job is currently enqueued or running如果同一作业当前正在排队或正在运行,Rails/Sidekiq 会阻止启动新作业
【发布时间】:2022-08-15 08:20:59
【问题描述】:

我的 rails 应用程序使用 sidekiq 运行。该应用程序拥有多个帐户。 每个帐户都可以运行ImportResourceJob,它发送account_id 作为参数来识别要使用的正确帐户。 我想防止为同一帐户同时启动多个 ImportResourceJobs。 基本上,我想在启动新的 ImportResourceJob 之前检查该特定 account_id 的当前队列/运行 ImportResourceJob

我有点不确定如何做到这一点。我已经看到建议使用 sidekiq api https://github.com/mperham/sidekiq/wiki/API#scanhttps://github.com/mperham/sidekiq/wiki/API#workers 中的扫描方法的答案

workers = Sidekiq::Workers.new
workers.size # => 2
workers.each do |process_id, thread_id, work|
  # process_id is a unique identifier per Sidekiq process
  # thread_id is a unique identifier per thread
  # work is a Hash which looks like:
  # { \'queue\' => name, \'run_at\' => timestamp, \'payload\' => msg }
  # run_at is an epoch Integer.
  # payload is a Hash which looks like:
  # { \'retry\' => true,
  #   \'queue\' => \'default\',
  #   \'class\' => \'Redacted\',
  #   \'args\' => [1, 2, \'foo\'],
  #   \'jid\' => \'80b1e7e46381a20c0c567285\',
  #   \'enqueued_at\' => 1427811033.2067106 }
end

这似乎不是非常精确或可实现的(仅每 5 秒更新一次)。 如果你有很多工人,在我看来也是不可扩展的。

有一个 Jobs 表是否常见/良好的做法:

  • column account_id = 帐户 has_many Jobs
  • type = 作业类别(例如:ImportResourceJob
  • status=enqueud,running,finished,failed

处理那些事情?这个想法是在启动作业之前在 Jobs 表中创建一个条目,并将 job_id 传递给 Job。像这样的东西:

def launches_import_resource_job
  existing_running_job = Job.find_by(type: \"ImportResourceJob\", account_id: account_id, status: [\"enqueued\", \"running\"])
  return if existing_running_job

  job = Job.create(
  type: \"ImportResourceJob\",
  account_id: account_id,
  status: \"enqueued\"
  )

  ImportLmsResourcesJob.perform_later(
    account_id,
    job.id
  )
end

然后在 ImportResourcesJob 本身:

class ImportResourcesJob < ApplicationJob
  queue_as :default
  sidekiq_options retry: false

  def perform(account_id, job_id)
    job = Job.find(job_id)
    job.update(status: \"running\")
    Sync360Service.call(account_id)
    job.update(status: \"finished\")
    rescue Exception => e
      job.update(status: \"failed\")
      raise e
  end
end

解决此问题的公认/好的解决方案是什么?

  • 有一个表来跟踪排队的作业是一个可以接受的解决方案,这取决于您的架构以及数据库负载和延迟的轻微增加是否可以接受(在大多数情况下是可以接受的)。

标签: ruby-on-rails jobs sidekiq worker


【解决方案1】:

@Ankit 是正确的,因为这是一种策略将要工作,但一个单独的表不是真的必要。

1.使用自定义队列

我看到您正在使用:default 队列,我建议使用自定义队列,特别是如果您正在考虑与其他工作一起扩展。

class ImportResourcesJob < ApplicationJob
  queue_as :import_resources_job
  ...
end

2. 使用 Sidekiq 作业 ID

如果您不想使用扫描,只需在 Account 表中添加一列并保存 Sidekiq 作业 ID。无需保存状态,因为 Sidekiq 作业会更改状态,然后您的 dB 值将过时。

创建作业时将其保存到您的帐户记录中,当作业完成时将其从记录中删除。

(因为看起来你正在使用ActiveJob

class ImportResourcesJob < ApplicationJob
  queue_as :default
  sidekiq_options retry: false

  def perform(account_id)
    account = Account.find(account_id)

    account.update_column(import_resources_job_id: job.id)

    Sync360Service.call(account_id)

    rescue Exception => e
      raise e

    account.update_column(import_resources_job_id: nil)
  end
end

并防止创建工作:

def launches_import_resource_job
  return unless import_resources_job_id.nil?

  ImportLmsResourcesJob.perform_later(
    account_id,
    job.id
  )
end

如果您需要使用多个不同的作业来复制它,我会在我的表中使用 JSONB 列来使用 { #{job_name} =&gt; #{job_id} } 散列 Sidekiq 作业

关于更新工作细节的说明

在你的工作中,你会做job.update(status: "running") 之类的事情。这只会更新内存中的作业详细信息,而不是 Redis 中的。只是被警告。

此外,Sidekiq 会为您更新所有工作状态,因此无论如何都不需要这样做。

【讨论】:

    猜你喜欢
    • 2015-07-03
    • 2014-11-11
    • 2023-04-03
    • 1970-01-01
    • 2020-12-23
    • 1970-01-01
    • 1970-01-01
    • 2012-08-19
    • 2018-06-29
    相关资源
    最近更新 更多