【问题标题】:AMQP MultitaskingAMQP 多任务处理
【发布时间】:2013-03-08 17:58:26
【问题描述】:

我有一个有趣的情况需要完成。我需要有一个 EventMachine 循环,它在 AMQP 队列中等待消息,然后中断该循环,以便定期将消息发送到单独的 AMQP 队列。我是 EventMachine 的新手,这就是我目前所拥有的,除了 EventMachine 循环没有发送必要的消息。

现在我已经制作了两个 proc:

    listen_loop = Proc.new {
        AMQP.start(connection_config) do |connection|
            AMQP::Channel.new(connection) do |channel|
                channel.queue("queue1", :exclusive => false, :durable => true) do |requests_queue|
                    requests_queue.once_declared do
                        consumer = AMQP::Consumer.new(channel, requests_queue).consume
                        consumer.on_delivery do |metadata, payload|
                            puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
                            response = "responding"
                            channel.default_exchange.publish(response,
                                :routing_key    => metadata.reply_to,
                                :correlation_id => metadata.message_id,
                                :mandatory      => true)
                            metadata.ack
                        end
                    end
                end
            end
        end
        Signal.trap("INT")  { AMQP.stop { EM.stop } }
        Signal.trap("TERM") { AMQP.stop { EM.stop } }
    }

    send_message = Proc.new {
        AMQP.start(connection_config) do |connection|
            channel = AMQP::Channel.new(connection)
            queue   = channel.queue('queue2')

            channel.default_exchange.publish("hello world", :routing_key => queue.name)
            EM.add_timer(0.5) do
                connection.close do
                    EM.stop{ exit }
                end
            end
        end
    }

然后我有我的 EventMachine 循环:

    EM.run do 
        EM.add_periodic_timer(5) { send_message.call }
        listen_loop.call
    end

我能够在监听循环中接收消息,但我无法定期发送任何消息。

【问题讨论】:

    标签: ruby rabbitmq amqp eventmachine


    【解决方案1】:

    弄清楚我做错了什么。消息循环无法打开到 RabbitMQ 服务器的新连接,因为它已经连接。将所有内容整合到一个 EventMachine 循环中并重用连接并且它可以工作。

    对于那些好奇的人来说,它看起来像这样:

    EM.run do
    
        AMQP.start(connection_config) do |connection|
            channel = AMQP::Channel.new(connection)
    
            EM.add_periodic_timer(5) { channel.default_exchange.publish("foo", :routing_key => 'queue2') }
    
            queue = channel.queue("queue1", :exclusive => false, :durable => true)
            channel.prefetch(1)
            queue.subscribe(:ack => true) do |metadata, payload|
                puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
                response = "bar"
                channel.default_exchange.publish(response,
                    :routing_key    => metadata.reply_to,
                    :correlation_id => metadata.message_id,
                    :mandatory      => true)
                metadata.ack
            end
        end
        Signal.trap("INT")  { AMQP.stop { EM.stop } }
        Signal.trap("TERM") { AMQP.stop { EM.stop } }
    
    end
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-26
      • 1970-01-01
      相关资源
      最近更新 更多