【问题标题】:RabbitMQ consumer as a windows serviceRabbitMQ 消费者作为 Windows 服务
【发布时间】:2015-11-13 22:08:41
【问题描述】:

我有一个 rabbitmq 消费者应用程序在 .net 中实现“发布/订阅模式”,它作为控制台应用程序完美运行,但是当我将其部署为 Windows 服务时,它似乎没有将数据保存到 mongodb 中。

    protected override void OnStart(string[] args)
    {
        try
        {
             var connectionString = "mongodb://localhost";
            var client = new MongoClient(connectionString);
            var factory = new ConnectionFactory() { HostName = "localhost" };            
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: "test", type: "fanout");
                    var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queue: queueName,                                       exchange: "logs", routingKey: "");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        BsonDocument document = BsonDocument.Parse(message);
                        var database = client.GetDatabase("test");
                        var collection = database.GetCollection<BsonDocument>("test_collection");
                        collection.InsertOneAsync(document);
                    };
                    channel.BasicConsume(queue: queueName,                                       noAck: true,consumer: consumer);

                }
            }
        }
        catch (Exception ex)
        {
            throw;
        }
    }

我有什么遗漏的吗?

【问题讨论】:

  • 你检查日志了吗?
  • 您无需等待 InsertOneAsync 的结果...任何事情都可能发生,您永远不会知道...使用 collection.InsertOneAsync(document).GetAwaiter().GetResult() ;
  • @Gabriele 我确实尝试登录以查看是否确实收到了消息。但看起来不像。
  • @CraigWilson 一开始甚至没有任何消息可以转储到 mongoDB 中。另外,为了验证我检查了 mongoDB 的日志。没有从该服务转储给它的数据

标签: .net windows-services rabbitmq mongodb-.net-driver


【解决方案1】:

在 OnStart() 中进行繁忙等待是个坏主意,因为操作系统会期待它的返回。在这里阅读:https://msdn.microsoft.com/en-us/library/zt39148a%28v=vs.110%29.aspx

编辑:上面代码的问题是你在 using 语句中有你的连接和通道。这样做的全部意义在于一旦超出范围就将它们处理掉。因此,在这种情况下,即使您正在添加一个事件处理程序,您也会在退出范围并处理通道等之后不久。要解决此问题,请将连接、通道和使用者从“OnStart”方法中拉出并使它们成为类(可能是私有的)成员。即使您退出方法并且您的事件应该继续侦听,这也应该让它们保持打开状态。

【讨论】:

    【解决方案2】:

    今天我们需要将 RabbitMQ 消费者作为 Windows 服务,并在方法 OnStart 中使用 Timer 解决。

    private Timer _timer;
    
    protected override void OnStart(string[] args)
    
    {
         _timer = new Timer();
         _timer.Interval = 5000; 
         _timer.Elapsed += new ElapsedEventHandler(this.OnTimer);
         _timer.Start();
    }
    
    public void OnTimer(object sender, System.Timers.ElapsedEventArgs args)
    {
         _timer.Enabled = false;
    
         ...
    }
    

    非常感谢您的帮助,并希望对这个解决方案也有帮助

    【讨论】:

      【解决方案3】:

      下面的答案可以帮助我解决这个问题。如上所述,您不应在 OnStart 方法中使用不返回的 using 语句。所以你可以在 OnStart 方法中得到一条消息,但你不能通过 using 语句的帮助来声明消费者。

      the solution which fixes the problem for me

      希望对你也有帮助

      【讨论】:

        【解决方案4】:

        对我的 Onstart 方法进行以下更改就可以了

            protected override void OnStart(string[] args)
            {
        
                ConnectionFactory factory = new ConnectionFactory { HostName = localhost" };
                var connectionString = "mongodb://localhost";
                var client = new MongoClient(connectionString);
        
        
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "test", type: "fanout");
        
                        string queueName = channel.QueueDeclare();
        
                        channel.QueueBind(queueName, "test", "");
        
                        this.EventLog.WriteEntry("Waiting for messages");
        
                        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                        channel.BasicConsume(queueName, true, consumer);
        
                        while (true)
                        {
                            BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                            var message = Encoding.UTF8.GetString(e.Body);
                            BsonDocument document = BsonDocument.Parse(message);
                            var database = client.GetDatabase("test");
                            var collection = database.GetCollection<BsonDocument>("test_collection");
                            collection.InsertOneAsync(document);
        
                        }
                    }
                }
            }
        

        【讨论】:

        • 此服务如何处理停止? IE。它会停止吗?不需要杀死它,因为a)while循环没有条件,b)对consumer.Queue.Dequeue()的调用没有阻塞?
        • 您的 OnStart 服务永远不会返回。如果您想执行这样的工作,请在 OnStart 中启动另一个线程,然后从 OnStart 方法返回。
        猜你喜欢
        • 2018-05-10
        • 1970-01-01
        • 1970-01-01
        • 2011-04-15
        • 1970-01-01
        • 2013-03-25
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多