【问题标题】:Flushing a queue using AMQP, Rabbit, and Ruby使用 AMQP、Rabbit 和 Ruby 刷新队列
【发布时间】:2011-02-08 04:38:03
【问题描述】:

我正在用 Ruby 开发一个系统,该系统将利用 RabbitMQ 将消息发送到队列,因为它会完成一些工作。我正在使用:

我在这个 gem 上看到的大多数示例都在 EM.add_periodic_timer 块中进行了发布调用。这不适用于我怀疑的绝大多数用例,当然也不适用于我的用例。我需要在完成一些工作时发布一条消息,因此将发布语句放在 add_periodic_timer 块中是不够的。

所以,我正在尝试弄清楚如何将一些消息发布到队列中,然后“刷新”它,以便我发布的所有消息随后都会传递给我的订阅者。

为了让您了解我的意思,请考虑以下发布商代码:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

MESSAGES = ["hello","goodbye","test"]

AMQP.start do

  queue = MQ.queue('testq')
  messages_published = 0

  while (messages_published < 50)

    if (rand() < 0.4)
      message = MESSAGES[rand(MESSAGES.size)]
      puts "#{Time.now.to_s}: Publishing: #{message}"
      queue.publish(message)
      messages_published += 1
    end

    sleep(0.1)

  end

  AMQP.stop do
    EM.stop
  end

end

因此,此代码只是循环,在循环的每次迭代中以 40% 的概率发布一条消息,然后休眠 0.1 秒。它会一直执行此操作,直到发布了 50 条消息,然后停止 AMQP。当然,这只是概念验证。

现在,我的订阅者代码:

#!/usr/bin/ruby

require 'rubygems'
require 'mq'

AMQP.start do

  queue = MQ.queue('testq') 
  queue.subscribe do |header, msg|
    puts "#{Time.now.to_s}: Received #{msg}"
  end

end

所以,我们只需订阅队列,并且对于收到的每条消息,我们都会将其打印出来。

很好,只是订阅者只在发布者调用 AMQP.stop 时才收到全部 50 条消息。

这是我的出版商的输出。为简洁起见,已在中间截断:

$ ruby publisher.rb 
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:42 -0400: Publishing: hello
2010-04-14 21:45:42 -0400: Publishing: test
2010-04-14 21:45:43 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: test
2010-04-14 21:45:44 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: goodbye
2010-04-14 21:45:45 -0400: Publishing: test
2010-04-14 21:45:45 -0400: Publishing: test
.
.
.
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: test
2010-04-14 21:45:55 -0400: Publishing: goodbye

接下来,我的订阅者的输出:

$ ruby consumer.rb
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received hello
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received goodbye
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
.
.
.
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received test
2010-04-14 21:45:56 -0400: Received goodbye

如果您注意到输出中的时间戳,则订阅者仅在发布者停止 AMQP 并退出后才会收到所有消息。

那么,作为 AMQP 新手,我怎样才能让我的消息立即传递?我尝试将 AMQP.start 和 AMQP.stop 放在发布者的 while 循环的主体中,但是只有第一条消息被传递 - 虽然奇怪的是,如果我打开日志记录,服务器没有报告错误消息并且消息确实会发送到队列,但不会被订阅者接收。

建议将不胜感激。感谢阅读。

【问题讨论】:

    标签: ruby rabbitmq amqp


    【解决方案1】:

    任何想了解此问题的人,请参阅http://groups.google.com/group/ruby-amqp/browse_thread/thread/311965bcf9697ece

    我通过在我的发布者代码中添加一个额外的线程解决了这个问题:

    !/usr/bin/ruby
    
    require 'rubygems'
    require 'mq'
    
    MESSAGES = ["hello","goodbye","test"]
    
    AMQP.start do
    
      Thread.new do
          queue = MQ.queue('testq')
          messages_published = 0
    
          while (messages_published < 50)
    
            if (rand() < 0.4)
              message = MESSAGES[rand(MESSAGES.size)]
              puts "#{Time.now.to_s}: Publishing: #{message}"
              queue.publish(message)
              messages_published += 1
            end
    
            sleep(0.1)
    
          end
    
          AMQP.stop do
            EM.stop
          end
    
      end
    
    end
    

    【讨论】: