【问题标题】:rabbitmq consumer becomes a producerrabbitmq 消费者成为生产者
【发布时间】:2013-03-25 13:09:15
【问题描述】:

我在消费者内部接收来自 RabbitMQ 的消息。我必须处理该消息并将处理后的消息发布到不同的队列中。我将如何做到这一点?

我的代码是

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        if (!String.IsNullOrEmpty(EXCHANGE_NAME))
            channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, durable);

        if (!String.IsNullOrEmpty(QUEUE_NAME))
            channel.QueueDeclare(QUEUE_NAME, false, false, false, null);

        string data = "";
        EventingBasicConsumer consumer = new EventingBasicConsumer();
        consumer.Received += (o, e) =>
        {
            //This is the received message
            data = data + Encoding.ASCII.GetString(e.Body) + Environment.NewLine;
            string processed_data = "processed data = " + data; 
            //I want to write some code here to post the processed message to a different queue.
            //or other idea is "can I use duplex services? 

        };
        string consumerTag = channel.BasicConsume(QUEUE_NAME, true, consumer);

        channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
        channel.QueueUnbind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
    }
}

【问题讨论】:

  • 我的问题与此类似。 stackoverflow.com/questions/3972756/…
  • 我编写了一个新方法,将处理后的消息作为输入字符串传递。在该方法中,我创建了一个连接工厂、一个新模型并将消息发布到不同的队列中。这就是我采取的方法。

标签: c# .net rabbitmq message-queue


【解决方案1】:

底线是您可以共享线程之间的连接,但不能共享通道。因此,在您的示例中,您可以使用相同的连接,但是当您想要发布时需要创建一个新通道(因为 consumer.Received 事件将在不同的线程上引发):

using (IConnection connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        if (!String.IsNullOrEmpty(EXCHANGE_NAME))
            channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, durable);

        if (!String.IsNullOrEmpty(QUEUE_NAME))
            channel.QueueDeclare(QUEUE_NAME, false, false, false, null);

        string data = "";
        EventingBasicConsumer consumer = new EventingBasicConsumer();
        consumer.Received += (o, e) =>
        {
            //This is the received message
            data = data + Encoding.ASCII.GetString(e.Body) + Environment.NewLine;
            string processed_data = "processed data = " + data; 
            //I want to write some code here to post the processed message to a different queue.
            //or other idea is "can I use duplex services? 

            using (IModel channel = connection.CreateModel())
            {
                channel.Publish( ... );
            }

        };
        string consumerTag = channel.BasicConsume(QUEUE_NAME, true, consumer);

        channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
        channel.QueueUnbind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);

        // don't dispose of your channel until you've finished consuming
    }

    // don't dispose of your connection until you've finished consuming
}

确保在您想停止消费之前不要处置您的消费渠道。连接也是如此。这是一个常见的错误。

【讨论】:

  • 迈克感谢您的指点。这也正是我所关心的。不想最终陷入死锁或内存泄漏。
猜你喜欢
  • 1970-01-01
  • 2016-03-01
  • 1970-01-01
  • 1970-01-01
  • 2018-04-12
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-06-03
相关资源
最近更新 更多