【问题标题】:RabbitMQ/Sneakers - Limit specific queue to only one worker at a time?RabbitMQ/Sneakers - 一次将特定队列限制为一个工作人员?
【发布时间】:2023-03-16 09:13:01
【问题描述】:

我有一个使用 RabbitMQ 和 Sneakers gem 的用例,我有多个 Worker 运行以响应我项目中的几十个队列。因此,worker 很可能同时处理来自同一个队列的消息。

但是,特别是对于一个队列 - 我们称之为 :one_at_a_time - 我只希望一个工作人员能够在任何给定时间处理来自队列的消息。

我想这样做的原因是因为工人被设计为执行以下操作:

  1. 通过传入的 ID 查找 AR 对象
  2. 检查是否设置了属性 - 比如说 :worked。
    1. 如果true,则ack! 消息。
    2. 如果false,向用户发送电子邮件,然后将:worked 设置为true。

这样设计是为了防止我不小心向用户发送两次电子邮件,以防两个消息以相同的对象 ID 快速连续创建。如果在任何给定时间只有一个工作人员听过这个队列,这个设计就可以正常工作,因为第一次运行将经过步骤 1 -> 2 -> 2,然后下一次运行将经过步骤 1 -> 2 -> 1 并且不会向用户发送电子邮件。但在测试中,我发现有可能出现竞争条件,即两个工作人员会同时从:one_at_a_time 队列中拉出一条消息,通过设置:worked 的检查,然后都发送一封电子邮件。

考虑到这一切,有没有办法可以限制收听队列的工作人员数量?谢谢。

【问题讨论】:

  • 您找到解决方案了吗?遇到完全相同的问题,很想知道您是如何解决的。

标签: ruby-on-rails ruby rabbitmq sneakers


【解决方案1】:

如需进一步参考,Argus9 的请求可按以下步骤存档:

1) 您可以控制员工的选择:

class YourWorker
include Sneakers::Worker
from_queue "your_queue",
           :env => nil,
           :ack => true,
           :workers => 1, #Number of per-cpu processes to run
           :prefetch => 1, #This param will define that single message will be fetched per time
           :threads => 1, #This will define that you have single thread running
           :heartbeat => 2,
           :share_threads => true,
           :timeout_job_after => 3600,
           :exchange => 'your_exchange'

def work(args={})
 #... your steps here
end 
end

2) 您需要注意您在sneakers.rb 中指定的初始参数(在worker 初始化时由Sneakers::Runner 使用),因此请确保其中包含正确的参数,例如:

Sneakers.configure  :amqp => url,
                :daemonize => true,
                :ack => true,
                :prefetch => 1,
                :threads => 1,
                :start_worker_delay => 0.1,  
                :workers => 1,               
                :exchange => "your_exchange",
                :exchange_type => :direct,
                :log => "log/sneakers.log"
Sneakers.logger.level = Logger::DEBUG

您还可以使用 RabbitMQ API 构建一些额外的控件,这将使您能够检查诸如是否已经有一些消息正在处理?...等等,使用 bunny 归档不是那么容易等等。 使用非常简单的代码,例如:

    def queue_info
    queues_infos = {}    
    rabbitmqctl_url = "http://127.0.0.1:15672"
    rabbitmqctl_user = "your_user"
    rabbitmqctl_password = "your_password"
    uri = URI.parse("#{rabbitmqctl_url}/api/queues")
    request = Net::HTTP::Get.new(uri)
    request.basic_auth(rabbitmqctl_user, rabbitmqctl_password)
    req_options = { use_ssl: uri.scheme == 'https' }
    response = Net::HTTP.start(uri.hostname, uri.port, req_options)  do |http|
      http.request(request)
    end
    queue_details = JSON.parse(response.body)
    queue_details.each do |queue|
      queues_infos[queue['name'].to_s] = {  name: queue['name'],
                                            msg_total: queue['messages'],
                                            msg_ready: queue['messages_ready'],
                                            msg_unacknowlged: queue['messages_unacknowledged'],
                                            state: queue['state'],
                                            consumers: queue['consumers'] }
    end
    return queues_infos
end

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-04
    • 2014-12-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多