【问题标题】:Quickly adding multiple items (1000/sec) to a sidekiq queue?快速将多个项目(1000/秒)添加到 sidekiq 队列?
【发布时间】:2014-01-07 06:09:06
【问题描述】:

我意识到 sidekiq 有一个 push_bulk 选项,但我目前受到 redis 延迟的限制,因此通过 push_bulk 传递多个项目仍然不够快(只有大约 50/s)。

我尝试像这样增加 redis 连接的数量:

redis_conn = proc {
  Redis.new({ :url => Rails.configuration.redis.url })
}

Sidekiq.configure_client do |config|
  Sidekiq.configure_client do |config|
    config.redis = ConnectionPool.new(size: 50, &redis_conn)
  end
  config.client_middleware do |chain|
    chain.add Sidekiq::Status::ClientMiddleware
  end
end

然后启动单独的线程 (Thread.new) 以在各种对象上实际执行异步。有趣的是,任何不是第一个线程的线程都不会被扔到 sidekiq 队列中,就好像它们被完全忽略了一样。

有人知道更好的方法吗?

编辑:这是我尝试的 push_bulk 方法,实际上速度较慢:

  user_ids = User.need_scraping.pluck(:id)
  bar = ProgressBar.new(user_ids.count)
  user_ids.in_groups_of(10000, false).each do |user_id_group|
    Sidekiq::Client.push_bulk(
      'args'  => user_id_group.map{ |user_id| [user_id] },
      'class' => ScrapeUser,
      'queue' => 'scrape_user',
      'retry' => true
    )
  end

谢谢!

【问题讨论】:

    标签: ruby-on-rails redis sidekiq


    【解决方案1】:

    @Winfield 的回答是正确的,他对延迟的看法是完全正确的。但是,正确的语法其实是这样的:

    User.need_scraping.select('id').find_in_batches do |user_group|
      Sidekiq::Client.push_bulk({ 'class' => UserWorker, 'args' => user_group.map {|user| [user.id] } })
    end
    

    也许它在最近的 Sidekiq 中发生了变化(我懒得检查),但现在这是正确的语法。

    【讨论】:

      【解决方案2】:

      您确实想使用push_bulk。您受限于将元素写入支持 sidekiq 的 redis 队列的延迟/往返时间。

      您正在使用多个线程/连接来克服缓慢的网络,而实际上您应该删除额外的网络往返。

      假设您尝试将 20k UserWorker 工作排入队列,这些工作需要 user_id

      您可以通过以下方式将单个作业排入队列:

      UserWorker.perform_async(user_id)
      

      ...映射到:

      Sidekiq::Client.push('class' => UserWorker, 'args' => [user_id] )
      

      所以 20k user_ids 的 push_bulk 版本是:

      # This example takes 20k user_ids in an array, chunks them into groups of 1000 ids,
      # and batch sends them to redis as a group.
      
      User.need_scraping.select('id').find_in_batches do |user_group|
      
        sidekiq_items = user_group.map {|user| { 'class' => UserWorker, 'args' => [user.id] } }
        Sidekiq::Client.push_bulk(sidekiq_items)
      end
      

      这会将 20k 个 redis 调用转换为 20 个 redis 调用,平均往返时间为 5 毫秒(乐观),即 1 秒与 100 秒。您的里程可能会有所不同。

      编辑: 评论者似乎对 Sidekiq/Redis 客户端批量排队数据的行为感到困惑。

      Sidekiq::Client.push_bulk() 方法需要将一组作业排入队列。它将这些转换为 Sidekiq 作业负载哈希,然后调用 SideKiq::Client.raw_push() 将这些负载传递给 redis。见来源:https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/client.rb#L158

      SideKiq::Client.raw_push() 获取 Sidekiq 哈希负载列表,将它们转换为 JSON,然后执行结合两个 redis 命令的 redis MULTI 命令。首先,它将目标队列添加到活动队列列表 (redis SADD),然后将所有作业负载推送到目标队列 redis 列表对象 (redis LPUSH)。这是一个单一的 redis 命令,在一个单一的 redis 原子组中一起执行。

      如果这仍然很慢,您可能还有其他问题(网络速度慢、redis 服务器过载等)。

      【讨论】:

      • 这正是我正在尝试的,它实际上比单独循环遍历每条记录要慢。我在上面添加了我的代码。
      • 如果该代码很慢,那是因为您在做一些更根本性的错误,例如为您的数据库中的每个用户记录加载用户模型。尝试将批量大小降低到 100-1000。代码已更新。 user_ids = User.need_scraping.select('id').find_in_batches 做 |user_ids|
      • 查看sidekiq的来源时,似乎实际上仍然将每个项目单独发送到redis。我没有看到 push_bulk 的性能提升:/ 相关:github.com/mperham/sidekiq/issues/1410
      • 您误认为 Sidekiq 的 bulk_push 不会批处理作业。在解释的答案中添加更多细节。
      • 小细节:该方法被称为push_bulk,而不是bulk_push。 :) 另外,查看源代码,我不认为每个工作都应该重复 'class' => UserWorker,只有 args,像这样:Sidekiq::Client.push_bulk('class' => UserWorker, 'args' => user_group.map(&:id))。相关代码在这里:github.com/mperham/sidekiq/blob/master/lib/sidekiq/…
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2013-11-14
      • 1970-01-01
      • 1970-01-01
      • 2015-08-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多