【问题标题】:Using RabbitMQ in ASP.net core IHostedService在 ASP.net 核心 IHostedService 中使用 RabbitMQ
【发布时间】:2025-11-24 03:40:01
【问题描述】:

我遇到了一个需要帮助的问题。 我正在开发一个后台进程,它将监听 rabbitmq 服务器中的队列。 如果我在 .net 核心控制台应用程序中运行它就可以了。但是我想以一种更优雅的方式来完成它,例如 Web 服务(这给我带来了很多麻烦,因为它在安装时不起作用)或 IIS 托管的 Web 应用程序。 当我尝试在 .net 核心 Web 应用程序中托管服务 (IHostedService) 时,我遇到了 Scoped Service 的问题。

以下代码在控制台应用程序中运行良好。如何使其在 .net 核心 Web 应用程序中作为 IHostedService 运行。 我应该改变什么。 感谢您的帮助。 代码:

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using PaymentProcessor.Models;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.EntityFrameworkCore;

namespace PaymentProcessor
{
    public class PaymentProcessingService : HostedService
    {
        IConfiguration configuration;

        private EntitiesContext claimsContext;
        private string connectionString;

        private string HostName = "";
        private string UserName = "";
        private string Password = "";

        private static int MaxRetries;

        private IConnectionFactory factory;
        private IConnection connection;
        private IModel channel;

        public PaymentProcessingService(IConfiguration configuration)
        {
            this.configuration = configuration;

            this.connectionString = configuration.GetConnectionString ("StagingContext");
            claimsContext = new EntitiesContext(connectionString);

            HostName = this.configuration.GetValue<string>("Settings:HostName");
            UserName = this.configuration.GetValue<string>("Settings:UserName");
            Password = this.configuration.GetValue<string>("Settings:Password");
            MaxRetries = this.configuration.GetValue<string>("Settings:MaxRetries").ConvertTo<int>(); 
        }

        protected override async Task ExecuteAsync(CancellationToken cancellationToken)
        {

            connect:
            factory = new ConnectionFactory { HostName = HostName, UserName = UserName, Password = Password };

            try
            {
                connection = factory.CreateConnection();
                channel = connection.CreateModel();

                channel.ExchangeDeclare("payment_rocessing_exchange", "topic");
                channel.QueueDeclare("payment_processing_queue", true, false, false, null);
                channel.QueueBind("payment_processing_queue", "payment_processing_exchange", "processing");


                var queueArgs = new Dictionary<string, object>
                    {
                        { "x-dead-letter-exchange", "payment_processing_exchange" },
                        {"x-dead-letter-routing-key", "processing_retry"},
                        { "x-message-ttl", 10000 }
                    };

                channel.ExchangeDeclare("payment_rocessing_exchange", "topic");
                channel.QueueDeclare("payment_processing_retry_queue", true, false, false, queueArgs);
                channel.QueueBind("payment_processing_retry_queue", "payment_processing_exchange", "processing_retry", null);

                channel.ExchangeDeclare("payment_processing_exchange", "topic");
                channel.QueueDeclare("payment_processing_error_queue", true, false, false, null);
                channel.QueueBind("payment_processing_error_queue", "payment_processing_exchange", "processing_error", null);

                channel.ExchangeDeclare("payment_processing_exchange", "topic");
                channel.QueueDeclare("payment_integration_queue", true, false, false, null);
                channel.QueueBind("payment_integration_queue", "payment_processing_exchange", "integration", null);

                channel.BasicQos(0, 1, false);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {


                    var message = ea.Body.DeSerializeText();
                    try
                    {
                        var saveBundle = JObject.Parse(message);
                        var msg = (dynamic)((dynamic)saveBundle).Message;
                        string referenceNo = (string)msg.ReferenceNo;

                        var parameters = new[]
                        {
                           new SqlParameter
                            {
                                DbType =  DbType.String,
                                ParameterName = "ReferenceNo",
                                Value =referenceNo
                            }
                        };

                        var result = claimsContext.Database.ExecuteSqlCommand("dbo.PaymentReferencesProcessSingle @ReferenceNo", parameters);

                        IBasicProperties props = channel.CreateBasicProperties();
                        props.Persistent = true;
                        props.ContentType = "text/plain";
                        props.DeliveryMode = 2;

                        channel.BasicPublish("payment_processing_exchange", "integration", props, (new MessageEnvelope { RetryCounts = 0, Message = JObject.FromObject(new { ReferenceNo = referenceNo }) }).Serialize()); 



                    }
                    catch (Exception ex)
                    {

                        MessageEnvelope envelope = JsonConvert.DeserializeObject<MessageEnvelope>(message);

                        if (envelope.RetryCounts < MaxRetries)
                        {
                            int RetryCounts = envelope.RetryCounts + 1;
                            MessageEnvelope messageEnvelope = new MessageEnvelope { RetryCounts = RetryCounts, Message = envelope.Message };
                            var data = messageEnvelope.Serialize();
                            channel.BasicPublish("payment_processing_exchange", "processing_retry", null, data);

                        }
                        else
                        {
                            var data = envelope.Serialize();
                            channel.BasicPublish("payment_processing_exchange", "processing_error", null, data);
                        }

                    }
                    finally
                    {
                        channel.BasicAck(ea.DeliveryTag, false);
                    }

                };

                channel.BasicConsume(queue: "payment_processing_queue", autoAck: false, consumer: consumer);

            }
            catch (Exception ex)
            {

                Thread.Sleep(10000);
                goto connect;
            }

        }
    }
}

然后

services.AddScoped<IHostedService, PaymentProcessingService>();

【问题讨论】:

  • 不太确定问题出在哪里。事情看起来通常是正确的。如果您想解决问题,请查看此处。docs.microsoft.com/en-us/aspnet/core/fundamentals/host/…
  • 如果您提供您遇到的实际错误/异常,或者至少描述您描述为错误的行为,将会更有帮助(并且更有可能得到答案)。如果你解决了这个问题,你会用解决方案的简单细节回答你自己的帖子吗?

标签: .net-core rabbitmq


【解决方案1】:

正如 Dekim 所说,应该注册一个服务。

看看我在 GitHub 上创建的 example

Program.cs 如下所示:

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System.Threading.Tasks;

namespace Core
{
    internal class Program
    {
        static async Task Main(string[] args)
        {

            await new HostBuilder()
                 .ConfigureServices((hostContext, services) =>
                 {
                     services.AddHostedService<ServiceRabbitMQ>(); // register our service here            
                 })
                .RunConsoleAsync();
        }
    }
}

【讨论】:

    【解决方案2】:

    因为IHostedService 需要根据文档创建一个特殊的范围。

    来自上述答案中引用的 Microsoft 文档:

    默认情况下不会为托管服务创建范围。

    考虑使用:services.AddHostedService&lt;MyHostedService&gt;();

    【讨论】:

    • 欢迎来到 *,如果您能添加更多细节说明为什么 IHostedService 是一个“特殊类”以及为什么需要以这种方式注入它,那就太好了。这样,提出问题的人可能会更好地理解原因。
    最近更新 更多