【问题标题】:AutoSubscriber in EasyNetQ does not working - RabbitMQ .NETEasyNetQ 中的自动订阅者不起作用 - RabbitMQ .NET
【发布时间】:2021-07-25 11:51:19
【问题描述】:

基于link,我正在尝试为我的消息创建 EasyNetQ Dispatcher。由于某种原因,当我的消息出现在队列中时,我的消费者没有被触发,我不知道可能是什么原因。

public class Program
{
    static void Main(string[] args)
    {
        var config = LoadConfiguration();
        ConfigureServices(config);

        Console.ReadLine();
    }

    public static IConfiguration LoadConfiguration()
    {
        var builder = new ConfigurationBuilder()
            .SetBasePath(Directory.GetCurrentDirectory())
            .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);

        return builder.Build();
    }

    private static void ConfigureServices(IConfiguration config)
    {
        var services = new ServiceCollection()
                .AddSingleton(config)
                .AddEasyNetQ("host=127.0.0.1:5672;username=guest;password=guest")
                .AddSingleton<AutoSubscriber>(provider =>
                {
                    // When I put breakpoint below - is never reached. Is that correct behavior?
                    var subscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "SomePrefix")
                    {
                        AutoSubscriberMessageDispatcher = provider.GetRequiredService<IAutoSubscriberMessageDispatcher>()
                    };
                    subscriber.Subscribe(new Assembly[] { Assembly.GetExecutingAssembly() });
                    subscriber.SubscribeAsync(new Assembly[] { Assembly.GetExecutingAssembly() });
                    return subscriber;
                });

        services.BuildServiceProvider();
    }
}

下面是其余的嵌套代码,虽然它看起来工作正常 - 所以问题可能出在 Program.cs 中

EasyNetQExtension

public static class EasyNetQExtension
    {
        private static void InternalInitEasyNetQ(IServiceCollection service, string rabbitMqConnection)
        {
            service.AddSingleton(RabbitHutch.CreateBus(rabbitMqConnection));
            service.AddSingleton<IAutoSubscriberMessageDispatcher, ConsumerMessageDispatcher>(serviceProvider => new ConsumerMessageDispatcher(serviceProvider));

            var consumerTypes = Assembly.GetExecutingAssembly().GetTypes()
                .Where(x => x.IsClass && !x.IsAbstract && !x.IsInterface)
                .Where(x => x.GetInterfaces().Any(t => t.Name == typeof(IConsume<>).Name));
            foreach (var consumerType in consumerTypes)
            {
                service.AddTransient(consumerType);
            }
            
            // My consumer is found here, so this works properly
            var consumerAsyncTypes = Assembly.GetExecutingAssembly().GetTypes()
                .Where(x => x.IsClass && !x.IsAbstract && !x.IsInterface)
                .Where(x => x.GetInterfaces().Any(t => t.Name == typeof(IConsumeAsync<>).Name));
            foreach (var consumerAsyncType in consumerAsyncTypes)
            {
                service.AddTransient(consumerAsyncType);
            }
        }

        public static IServiceCollection AddEasyNetQ(this IServiceCollection service, string rabbitMqConnectionString)
        {
            InternalInitEasyNetQ(service, rabbitMqConnectionString);

            return service;
        }
    }

ConsumerMessageDispatcher

public class ConsumerMessageDispatcher : IAutoSubscriberMessageDispatcher
    {
        private readonly IServiceProvider _serviceProvider;

        public ConsumerMessageDispatcher(IServiceProvider serviceProvider)
        {
            _serviceProvider = serviceProvider;
        }

        public void Dispatch<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken = new CancellationToken()) where TMessage : class where TConsumer : class, IConsume<TMessage>
        {
            try
            {
                TConsumer consumer = _serviceProvider.GetRequiredService<TConsumer>();
                consumer.Consume(message);
            }
            catch (Exception exception)
            {
                throw;
            }
        }

        public async Task DispatchAsync<TMessage, TConsumer>(TMessage message, CancellationToken cancellationToken = new CancellationToken()) where TMessage : class where TConsumer : class, IConsumeAsync<TMessage>
        {
            try
            {
                TConsumer consumer = _serviceProvider.GetRequiredService<TConsumer>();
                await consumer.ConsumeAsync(message);
            }
            catch (Exception exception)
            {
                throw;
            }
        }
    }

【问题讨论】:

    标签: c# .net .net-core rabbitmq


    【解决方案1】:

    首先,你必须在你的控制台应用中实现IConsumer&lt;&gt;接口,这样才能通过InternalInitEasyNetQ方法注册。

    using EasyNetQ.AutoSubscribe;
    using Microsoft.Extensions.Logging;
    using System;
    using System.Threading;
    
    namespace MyApp
    {
        public class ConsumeTextMessage : IConsume<string>
        {
            private readonly ILogger _logger;
    
            public ConsumeTextMessage(ILogger<ConsumeTextMessage> logger)
            {
                _logger = logger;
            }
    
            public void Consume(string message, CancellationToken cancellationToken = default)
            {
                _logger.LogInformation("Logging the message: " + message);
                Console.WriteLine("Reading the message: " + message);
            }
        }
    }
    

    其次,您缺少使用IAutoSubscriberMessageDispatcher 编写消息的部分。您可以使用IServiceProvider 或依赖注入来解析IAutoSubscriberMessageDispatcher 接口。类似的东西:

    var dispatcher = _provider.GetRequiredService<IAutoSubscriberMessageDispatcher>();
    dispatcher.Dispatch<string, ConsumeTextMessage>("Dispatch my message - " + DateTime.Now);
    

    【讨论】:

    • 你是说我已经手动发送了我的消息?这没有任何意义。
    • 某人/某事必须发送消息。在现实生活中,它可能是单独的服务。在示例应用程序中,是的 - 您必须发送消息。并实现消费者。
    • 什么会将消息放入RabbitMQ?
    • 另一个服务。在我开始使用 AutoSubscribe Dispatcher 之前看看(所以当我手动定义订阅者时) - 一切正常。但现在我想用现代解决方案来改进它,即自动订阅任何新的消费者。
    • 啊,好的。现在明白了。然后只需实现IConsume&lt;&gt; 接口并将其调整为您期望的消息类型,以防它不是string,就像我的示例一样。您可以省略我答案的第二部分。
    【解决方案2】:

    我通过将 Autosubscriber 移动到单独的方法来解决这个问题,以便正确的外观:

    public class Program
    {
        static void Main(string[] args)
        {
            var config = LoadConfiguration();
            var provider = ConfigureServices(config);
    
            ConfigureConsumers(provider);
    
            Console.ReadLine();
        }
    
        public static IConfiguration LoadConfiguration()
        {
            var builder = new ConfigurationBuilder()
                .SetBasePath(Directory.GetCurrentDirectory())
                .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true);
    
            return builder.Build();
        }
    
        private static ServiceProvider ConfigureServices(IConfiguration configuration)
        {
            var services = new ServiceCollection()
                .AddTransient<IEmailSender, EmailSender>()
                .Configure<AuthMessageSenderOptions>(options => configuration.GetSection("SendGridEmailSettings").Bind(options))
                .AddEasyNetQ(configuration["QueueConnectionData"]);
    
            return services.BuildServiceProvider();
        }
    
        private static void ConfigureConsumers(ServiceProvider provider)
        {
            var autoSubscriber = new AutoSubscriber(provider.GetRequiredService<IBus>(), "SomePrefix")
            {
                AutoSubscriberMessageDispatcher = provider.GetRequiredService<IAutoSubscriberMessageDispatcher>()
            };
    
            autoSubscriber.Subscribe(new[] { Assembly.GetExecutingAssembly() });
            autoSubscriber.SubscribeAsync(new[] { Assembly.GetExecutingAssembly() });
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-10-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多