【问题标题】:RabbitMQ + Sneakers: one worker or 100?RabbitMQ + Sneakers:一个工人还是 100 个?
【发布时间】:2018-10-31 12:51:45
【问题描述】:

假设我需要为 100 个用户进行复杂的计算。我当前的配置如下所示:

制作人

class Producer
  class << self
    def publish(target, options = {})
      connection = Bunny.new(some_params).start
      channel    = connection.create_channel
      exchange   = channel.fanout("#{target}_exchange", durable: true)

      exchange.publish(options.to_json)
    end
  end
end

MassComplexCalculations 工作者

module UsersWorkers
  class MassComplexCalculations
    include Sneakers::Worker

    from_queue "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_queue",
               exchange: "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_exchange"

    def work(options)
      parsed_options = JSON.parse(options)

      ActiveRecord::Base.connection_pool.with_connection do
        User.where(id: parsed_options['ids']).each do |user|
          ::Services::Users::ComplexCalculations.call(user)
        end
      end
      ack!
    end
  end
end

运行工人

Producer.publish("#{ENV['RAILS_ENV']}.users.mass_complex_calculations", ids: User.limit(100).ids)

我不太了解 AMQP 如何分配资源来执行任务以及我如何提供帮助。对吗,在单独的工作人员中运行每个计算会更好吗?例如:

更改 MassComplexCalculations 工作人员

module UsersWorkers
  class MassComplexCalculations
    include Sneakers::Worker

    from_queue "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_queue",
               exchange: "#{ENV['RAILS_ENV']}.users.mass_complex_calculations_exchange"

    def work(options)
      parsed_options = JSON.parse(options)

      ActiveRecord::Base.connection_pool.with_connection do
        parsed_options['ids'].each do |id|
          Producer.publish("#{ENV['RAILS_ENV']}.users.personal_complex_calculations", id: id)
        end
      end
      ack!
    end
  end
end

新的 PersonalComplexCalculations 工作者

module UsersWorkers
  class PersonalComplexCalculations
    include Sneakers::Worker

    from_queue "#{ENV['RAILS_ENV']}.users.personal_complex_calculations_queue",
               exchange: "#{ENV['RAILS_ENV']}.users.personal_complex_calculations_exchange"

    def work(options)
      parsed_options = JSON.parse(options)
      user           = User.find(parsed_options['id'])

      ActiveRecord::Base.connection_pool.with_connection do
        ::Services::Users::ComplexCalculations.call(user)
      end
      ack!
    end
  end
end

在我的理解中,可能有两种选择:

  1. 第一个实现可能运行速度较慢,因为它会按顺序为每个用户调用服务,而在第二个选项中,我们将有 100 个同时工作的工作人员,他们将并行工作
  2. 没有区别

那么哪种方法更好呢?或者甚至其中一个是完全错误的?

提前致谢。

【问题讨论】:

    标签: ruby-on-rails ruby rabbitmq amqp sneakers


    【解决方案1】:

    你的假设都不成立。您不能保证有 100 个并行工作人员,因为运动鞋有一个您不一定要覆盖的默认线程池大小:

    https://github.com/jondot/sneakers/blob/master/lib/sneakers/worker.rb#L20

    如果你没有配置至少 100 个连接的 ActiveRecord 连接池,你的代码也会因为这里的资源不足而阻塞和等待。

    在 GENERAL 中,并行执行此类任务可能会更快大部分时间 - 但这并不能保证。

    【讨论】:

      猜你喜欢
      • 2023-03-16
      • 2023-03-08
      • 2014-12-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-14
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多