【问题标题】:Wait until message producer receives confirmation of work having finished by consumer等到消息生产者收到消费者已完成工作的确认
【发布时间】:2018-02-27 13:30:01
【问题描述】:

使用 Ruby 的“bunny”RabbitMQ 客户端,我希望我的生产者(一些 Ruby 代码)向消费者(使用“sneakers”gem 的工作人员)发送消息,并且我希望我的生产者不执行它的另一行Ruby 代码,直到生产者收到消费者收到我的消息并对其进行处理的确认。

在我的消费者中,我正在做一些工作,然后调用运动鞋的ack! 方法以确认收到消息并且工作已完成。

在我的制作人中,我在我的Bunny::Channel 实例上调用confirm_select 以将其放入确认码中,在publish-ing 我的消息之后,我在频道上调用wait_for_confirms 据说要等到我所有的消息已被消费者ack!-ed。 (我已经尝试实现我在兔子文档here 中找到的内容。)

但是,我的生产者似乎并没有等待消费者致电ack!。我同时登录了我的生产者和消费者,发现我的生产者似乎认为消息在消费者实际确认之前已被确认。

如何让 RabbitMQ 生产者等到消费者在 Ruby 中完成工作?

Ruby 2.3.3、RabbitMQ 3.6.12、Erlang 17.3。

这是我的锁文件:

GEM
  specs:
    amq-protocol (2.2.0)
    bunny (2.7.0)
      amq-protocol (>= 2.2.0)
    concurrent-ruby (1.0.5)
    serverengine (1.5.11)
      sigdump (~> 0.2.2)
    sigdump (0.2.4)
    sneakers (2.6.0)
      bunny (~> 2.7.0)
      concurrent-ruby (~> 1.0)
      serverengine (~> 1.5.11)
      thor
    thor (0.20.0)

PLATFORMS
  ruby

DEPENDENCIES
  bunny
  sneakers

BUNDLED WITH
   1.14.6

这是我的消费者/工人 (consumer_worker.rb):

class ConsumerWorker
  include Sneakers::Worker

  from_queue 'do-work-here',
             exchange: 'do-work-here',
             exchange_type: :direct,
             durable: true,
             prefetch: 1,
             arguments: {
               :'x-dead-letter-exchange' => 'do-work-here-retry'
             },
             timeout_job_after: 5,
             retry_timeout: 60000,
             ack: true

  def work(msg)
    open('ruby-debug.log', 'a') do |f|
      f.puts "message received: #{msg}"
    end
    sleep 1
    open('ruby-debug.log', 'a') do |f|
      f.puts "acknowledging message at: #{Time.now.to_i}"
    end
    ack!
    open('ruby-debug.log', 'a') do |f|
      f.puts "acknowledged message at: #{Time.now.to_i}"
    end
  end
end

在一个终端选项卡中,我正在运行这个工作人员:

bundle exec sneakers work ConsumerWorker --require consumer_worker.rb

这是我的发布者 (publisher.rb):

require 'bunny'
connection = Bunny.new('amqp://guest:guest@localhost:5672').tap(&:start)
channel = connection.create_channel
channel.confirm_select
queue = channel.queue('do-work-here',
                      {arguments: {:'x-dead-letter-exchange' => 'do-work-here-retry'},
                       durable: true})
queue.publish('hello world', persistent: true)
channel.wait_for_confirms
open('ruby-debug.log', 'a') do |f|
  f.puts "messages confirmed at: #{Time.now.to_i}"
end

当我在另一个选项卡中运行以下命令时:

ruby ./publisher.rb

然后我的日志文件 (./ruby-debug.log) 包含以下几行:

message received: hello world
messages confirmed at: 1505774819
acknowledging message at: 1505774820
acknowledged message at: 1505774820

我想要的是事件的顺序是这样的:

message received
acknowledging message
acknowledged message
messages confirmed

我该如何解决?

【问题讨论】:

标签: ruby rabbitmq bunny


【解决方案1】:

Publisher 确认仅涵盖发布者到 RabbitMQ 的通信。出版商不了解消费者。

请求/响应模式的示例请参见教程 6: https://www.rabbitmq.com/getstarted.html.

ConditionVariable 是一种常用的并发原语,用于延迟进一步的操作,直到事件发生,并具有可选的超时时间。

发布者确认和消费者确认记录在http://www.rabbitmq.com/confirms.htm

【讨论】:

  • 知道如何使用 Sneakers 执行 RPC 吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-01-29
  • 2013-02-22
  • 1970-01-01
  • 1970-01-01
  • 2014-10-16
  • 1970-01-01
相关资源
最近更新 更多