【问题标题】:RabbitMQ and EventMachine::deferRabbitMQ 和 EventMachine::defer
【发布时间】:2011-11-25 16:25:14
【问题描述】:

我正在尝试了解 RabbitMQ。我希望一个队列监听,当它收到一条消息时,我希望它回复一个匿名队列,该队列通过 reply_to 标头指定,包含多条消息。

到目前为止,我有以下任务,它既是 reply_to 消息的消费者也是订阅者:

desc "start_consumer", "start the test consumer"
def start_consumer
  puts "Running #{AMQP::VERSION} version of the gem"

      AMQP.start(:host => "localhost", :user => "guest", :password => "guest", :vhost => "/", 
                        :logging => true, :port => 5672) do |connection|

        channel = AMQP::Channel.new(connection)

        requests_queue = channel.queue("one", :exclusive => true, :auto_delete => true)

        Signal.trap("INT") do
          connection.close do
            EM.stop{exit}
          end
        end

        channel.prefetch(1)

        requests_queue.subscribe(:ack => true) do |header, body|
          puts "received in server #{body.inspect}"

          (0..5).each do |n|
            header.ack

            reply = {:reply => "Respone #{n}", :is_last => (n == 5)}

            AMQP::Exchange.default.publish(
                                            MultiJson.encode(reply), 
                                            :routing_key => header.reply_to,
                                            :correlation_id => header.correlation_id
                                          )


            sleep(2)
          end
        end

        puts " [x] Awaiting RPC requests"
      end          
    end

我的电话号码是:

  def publish(urlSearch, routing_key)
    EM.run do

      corr_id = rand(10_000_000).to_s

      requests ||= Hash.new

      connection = AMQP.connect(:host => "localhost")

      callback_queue = AMQP::Channel.new(connection).queue("", :exclusive => true)             

      callback_queue.subscribe do |header, body|

        reply = MultiJson.decode(body)               

        if reply[:is_last.to_s]  
          connection.close do
            EM.stop{exit}
          end
        end
      end                          

      callback_queue.append_callback(:declare) do
        AMQP::Exchange.default.publish(MultiJson.encode(urlSearch), :routing_key => routing_key, :reply_to => callback_queue.name, :correlation_id => corr_id)
      end

    end

问题是直到迭代中的所有消息都发布后才会发送消息。

即在(0..5)循环中的迭代完成之后,所有的消息都会被发布。

有人告诉我,使用 EventMachine::defer 可能是一种选择,但我不确定如何将其应用于循环。

任何人都可以提出如何解决这个问题的指针吗? EventMachine::defer 是一个不错的选择吗?如果是,在使用 AMQP.start 时我该怎么做?

【问题讨论】:

    标签: ruby rabbitmq eventmachine


    【解决方案1】:

    使用 EM 时要记住的是反应器本身是单线程的。因此,它在发送任何内容之前一直等到循环完成的原因是因为循环正在占用该单个线程。 (你真的,真的,真的不想在 EM 应用程序中使用 sleep。你阻塞了整个反应堆,什么都不会发生。)

    如果您希望发布消息,或者其他任何其他 EM 操作发生(触发计时器,接收其他数据等),您必须让反应器循环。

    您可以使用诸如 EM::Iterator 之类的东西来为您安排工作,例如:

    EM::Iterator.new(0..5).each do |n, iter|
      reply = {:reply => "Response #{n}", :is_last => (n == 5)}
      AMQP::Exchange.default.publish(MultiJson.encode(reply), 
                                     :routing_key => header.reply_to,
                                     :correlation_id => header.correlation_id)
      iter.next
    end
    header.ack
    

    【讨论】:

    • 我最终使用了 EM.defer gist.github.com/1480745。我正在使用 JRuby,也许我应该考虑使用 Java 线程。
    猜你喜欢
    • 2011-04-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-10
    • 1970-01-01
    • 2012-08-15
    相关资源
    最近更新 更多