【问题标题】:Work with two separate redis instances with sidekiq?使用 sidekiq 处理两个单独的 redis 实例?
【发布时间】:2013-01-22 09:31:18
【问题描述】:

下午好,

我有两个独立但相关的应用程序。它们都应该有自己的后台队列(阅读:单独的 Sidekiq 和 Redis 进程)。但是,我希望偶尔能够将作业从app1 推送到app2 的队列中。

从简单的队列/推送角度来看,如果app1 没有现有的 Sidekiq/Redis 堆栈,则很容易做到这一点:

# In a process, far far away

# Configure client 
Sidekiq.configure_client do |config|
  config.redis = { :url => 'redis://redis.example.com:7372/12', :namespace => 'mynamespace' }
end

# Push jobs without class definition 
Sidekiq::Client.push('class' => 'Example::Workers::Trace', 'args' => ['hello!'])

# Push jobs overriding default's 
Sidekiq::Client.push('queue' => 'example', 'retry' => 3, 'class' =>     'Example::Workers::Trace', 'args' => ['hello!'])

但是,考虑到我已经从 app1 调用了 Sidekiq.configure_clientSidekiq.configure_server,可能在这之间需要发生一些事情。

显然,我可以直接从 Sidekiq 内部获取序列化和规范化代码,然后手动推送到 app2 的 redis 队列,但这似乎是一个脆弱的解决方案。我希望能够使用Client.push 功能。

我想我理想的解决方案是这样的:

SidekiqTWO.configure_client { remote connection..... } SidekiqTWO::Client.push(job....)

甚至:

$redis_remote = remote_connection.....

Sidekiq::Client.push(job, $redis_remote)

显然有点滑稽,但这是我的理想用例。

谢谢!

【问题讨论】:

  • 我为此提供 200 点赏金——我对此更感兴趣,因为允许一个 sidekiq 客户端将消息“循环”到两个不同的 redis 实例中高可用性和故障转移。

标签: ruby-on-rails redis queue sidekiq


【解决方案1】:

所以有一件事是According to the FAQ,“Sidekiq 消息格式非常简单且稳定:它只是 JSON 格式的哈希。”强调我的——我不认为将 JSON 发送到 sidekiq 太脆弱了。尤其是当您希望围绕将作业发送到哪个 Redis 实例进行细粒度控制时,例如在 OP 的情况下,我可能只需要编写一个小包装器,让我可以指示 Redis 实例以及正在排队的作业。

对于 Kevin Bedell 将作业循环到 Redis 实例的更一般情况,我想您想要控制使用哪个 Redis 实例——您只想排队并自动管理分发。它看起来像only one person has requested this so far,而they came up with a solution 使用了Redis::Distributed

datastore_config = YAML.load(ERB.new(File.read(File.join(Rails.root, "config", "redis.yml"))).result)

datastore_config = datastore_config["defaults"].merge(datastore_config[::Rails.env])

if datastore_config[:host].is_a?(Array)
  if datastore_config[:host].length == 1
    datastore_config[:host] = datastore_config[:host].first
  else
    datastore_config = datastore_config[:host].map do |host|
      host_has_port = host =~ /:\d+\z/

      if host_has_port
        "redis://#{host}/#{datastore_config[:db] || 0}"
      else
        "redis://#{host}:#{datastore_config[:port] || 6379}/#{datastore_config[:db] || 0}"
      end
    end
  end
end

Sidekiq.configure_server do |config|
  config.redis = ::ConnectionPool.new(:size => Sidekiq.options[:concurrency] + 2, :timeout => 2) do
    redis = if datastore_config.is_a? Array
      Redis::Distributed.new(datastore_config)
    else
      Redis.new(datastore_config)
    end

    Redis::Namespace.new('resque', :redis => redis)
  end
end

在寻求高可用性和故障转移时要考虑的另一件事是获得 Sidekiq Pro,其中包括可靠性功能:“Sidekiq Pro 客户端可以承受 Redis 的短暂中断。它会在出错时在本地排队作业并尝试一旦连接恢复,就可以交付这些工作。”由于 sidekiq 无论如何都是用于后台进程,因此如果 Redis 实例出现故障,那么短暂的延迟不会影响您的应用程序。如果您的两个 Redis 实例中的一个出现故障并且您正在使用循环,除非您使用此功能,否则您仍然会丢失一些工作。

【讨论】:

  • 感谢您精心研究的答案!
【解决方案2】:

正如 carols10cents 所说,它非常简单,但因为我总是喜欢封装该功能并能够在其他项目中重用它,所以我从 blog from Hotel Tonight 更新了一个想法。以下解决方案改进了在 Rails 4.1 和 Spring 预加载器中无法使用的 Hotel Tonight。

目前我将以下文件添加到lib/remote_sidekiq/

remote_sidekiq.rb

class RemoteSidekiq
  class_attribute :redis_pool
end

remote_sidekiq_worker.rb

require 'sidekiq'
require 'sidekiq/client'

module RemoteSidekiqWorker
  def client
    pool = RemoteSidekiq.redis_pool || Thread.current[:sidekiq_via_pool] || Sidekiq.redis_pool
    Sidekiq::Client.new(pool)
  end

  def push(worker_name, attrs = [], queue_name = "default")
    client.push('args' => attrs, 'class' => worker_name, 'queue' => queue_name)
  end
end

你需要创建一个初始化器来设置 redis_pool

config/initializers/remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL")
namespace = 'primary'

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

Aleks 编辑:

在从来没有版本的 sidekiq 中,而不是行:

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

使用线条:

redis_remote_options = {
  namespace: "yournamespace",
  url: ENV.fetch("REDISCLOUD_URL")
}

RemoteSidekiq.redis_pool = Sidekiq::RedisConnection.create(redis_remote_options)

然后,您可以在任何需要的地方简单地使用include RemoteSidekiqWorker 模块。大功告成!

**** 适用于更大的环境 ****

添加 RemoteWorker 模型会带来额外的好处:

  1. 您可以在任何地方重用 RemoteWorker,包括有权访问目标 sidekiq 工作人员的系统。这对调用者是透明的。要在目标 sidekiq 系统中使用“RemoteWorkers”表单,只需不要使用初始化程序,因为它将默认使用本地 Sidekiq 客户端。
  2. 使用 RemoteWorkers 确保始终发送正确的参数(代码 = 文档)
  3. 通过创建更复杂的 Sidekiq 架构进行扩展对调用者来说是透明的。

这是一个 RemoteWorker 示例

class RemoteTraceWorker
  include RemoteSidekiqWorker
  include ActiveModel::Model

  attr_accessor :message

  validates :message, presence: true

  def perform_async
    if valid?
      push(worker_name, worker_args)
    else
      raise ActiveModel::StrictValidationFailed, errors.full_messages
    end
  end

  private

  def worker_name
    :TraceWorker.to_s
  end

  def worker_args
    [message]
  end
end

【讨论】:

  • 美丽答案
【解决方案3】:

我遇到了这个问题并遇到了一些问题,因为我使用的是 ActiveJob,这使得从队列中读取消息的方式变得复杂。

基于 ARO 的回答,您仍然需要 redis_pool 设置:

remote_sidekiq.rb

class RemoteSidekiq
  class_attribute :redis_pool
end

config/initializers/remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL")
namespace = 'primary'

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

现在,我们将创建一个 ActiveJob 适配器来对请求进行排队,而不是 worker:

lib/active_job/queue_adapter/remote_sidekiq_adapter.rb

require 'sidekiq'

module ActiveJob
  module QueueAdapters
    class RemoteSidekiqAdapter
      def enqueue(job)
        #Sidekiq::Client does not support symbols as keys
        job.provider_job_id = client.push \
          "class"   => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end

      def enqueue_at(job, timestamp)
        job.provider_job_id = client.push \
          "class"   => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ],
          "at"      => timestamp
      end

      def client
        @client ||= ::Sidekiq::Client.new(RemoteSidekiq.redis_pool)
      end
    end
  end
end

我现在可以使用适配器对事件进行排队:

require 'active_job/queue_adapters/remote_sidekiq_adapter'

class RemoteJob < ActiveJob::Base
  self.queue_adapter = :remote_sidekiq

  queue_as :default

  def perform(_event_name, _data)
    fail "
      This job should not run here; intended to hook into
      ActiveJob and run in another system
    "
  end
end

我现在可以使用普通的 ActiveJob api 对作业进行排队。无论应用程序从队列中读取此内容,都需要有一个匹配的RemoteJob 可用于执行操作。

【讨论】:

    猜你喜欢
    • 2020-12-29
    • 1970-01-01
    • 2018-06-28
    • 2015-05-27
    • 2013-12-15
    • 2022-12-17
    • 2012-10-24
    • 1970-01-01
    • 2021-12-21
    相关资源
    最近更新 更多