【问题标题】:Unable to publish messages to rabbitmq using masstransit无法使用 masstransit 将消息发布到 rabbitmq
【发布时间】:2021-04-29 06:33:07
【问题描述】:

我正在尝试使用 MassTransit 和 RabbitMq 实现简单的发布/订阅。 我正在使用 dot net 4.8 版来实现它。 我创建了 2 个控制台应用程序项目(每个用于 pub 和 sub) 我无法通过 MassTransit 将消息发布到队列,但如果我直接使用 RabbitMq.CLient nuget,我可以发送和接收消息。

请在下面找到我的发布者和订阅者类: Publisher.cs:

using System;
using Contracts;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace Publisher
{
    internal class Send
    {
        public static void Main(string[] args)
        {
            SendMessage();
        }

        private static async void SendMessage()
        {
            //Log4NetLogger.Use();
            for (var i = 0; i < 10000; i++)
            {
                IBusControl busControl = null;
                try
                {
                    busControl = Bus.Factory.CreateUsingRabbitMq(sbc =>
                    {
                        sbc.Host(new Uri("rabbitmq://localhost"), h =>
                        {
                            h.Username("guest");
                            h.Password("guest");
                        });
                    });
                    busControl.Start();
                    var message = new ValueEntered
                    {
                        Value = $"Message {i}"
                    };

                    // The below call is not pushing messages. No Exception to is found.
                    await busControl.Publish(message);

                    Console.WriteLine($"Published [x] : {message} [at] {DateTime.Now:u}");
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.ToString());
                }
                finally
                {
                    busControl?.Stop();
                }
            }
        }
    }
}

订阅者.cs:

using System;
using System.Threading;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace Subscriber
{
    internal class Receive
    {
        public static void Main(string[] args)
        {
            ReceiveMessage();
        }

        private static void ReceiveMessage()
        {
            //Log4NetLogger.Use();
            IBusControl busControl = null;
            try
            {
                busControl = Bus.Factory.CreateUsingRabbitMq(busFactoryConfigurator =>
                {
                    busFactoryConfigurator.Host(new Uri("rabbitmq://localhost"), h =>
                    {
                        h.Username("guest");
                        h.Password("guest");
                    });

                    busFactoryConfigurator.ReceiveEndpoint("value-events-listener", e =>
                    {
                        e.Consumer<ValueConsumer>();
                    });
                });
                busControl.Start();
                while (true)
                {
                    Thread.Sleep(5000);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            finally
            {
                busControl?.Stop();
            }
        }
    }
}

使用的合同:

namespace Contracts
{
    public class ValueEntered
    {
        public string Value { get; set; }
    }
}

消费者:

using System;
using System.Threading.Tasks;
using Contracts;
using MassTransit;
using Newtonsoft.Json;

namespace Subscriber
{
    public class ValueConsumer : IConsumer<ValueEntered>
    {
        public Task Consume(ConsumeContext<ValueEntered> context)
        {
            return Task.Run(() =>
            {
                Console.WriteLine($"Received [x] {JsonConvert.SerializeObject(context)} [at] {DateTime.Now:u}");
            });
        }
    }
}

已安装的软件包:

<?xml version="1.0" encoding="utf-8"?>
<packages>
  <package id="Automatonymous" version="5.1.1" targetFramework="net48" />
  <package id="GreenPipes" version="4.0.0" targetFramework="net48" />
  <package id="log4net" version="2.0.8" targetFramework="net48" />
  <package id="MassTransit" version="7.1.8" targetFramework="net48" />
  <package id="MassTransit.Log4Net" version="5.5.6" targetFramework="net48" />
  <package id="MassTransit.RabbitMQ" version="7.1.8" targetFramework="net48" />
  <package id="Microsoft.Bcl.AsyncInterfaces" version="1.1.1" targetFramework="net48" />
  <package id="Microsoft.Extensions.Logging.Abstractions" version="2.1.1" targetFramework="net48" />
  <package id="NewId" version="3.0.3" targetFramework="net48" />
  <package id="Newtonsoft.Json" version="11.0.2" targetFramework="net48" />
  <package id="Newtonsoft.Json.Bson" version="1.0.1" targetFramework="net48" />
  <package id="RabbitMQ.Client" version="6.2.1" targetFramework="net48" />
  <package id="System.Buffers" version="4.5.1" targetFramework="net48" />
  <package id="System.Diagnostics.DiagnosticSource" version="4.7.1" targetFramework="net48" />
  <package id="System.Memory" version="4.5.4" targetFramework="net48" />
  <package id="System.Numerics.Vectors" version="4.5.0" targetFramework="net48" />
  <package id="System.Reflection.Emit" version="4.7.0" targetFramework="net48" />
  <package id="System.Reflection.Emit.Lightweight" version="4.7.0" targetFramework="net48" />
  <package id="System.Reflection.Extensions" version="4.3.0" targetFramework="net48" />
  <package id="System.Runtime.CompilerServices.Unsafe" version="4.5.3" targetFramework="net48" />
  <package id="System.Runtime.Loader" version="4.3.0" targetFramework="net48" />
  <package id="System.Threading.Channels" version="4.7.1" targetFramework="net48" />
  <package id="System.Threading.Tasks.Extensions" version="4.5.4" targetFramework="net48" />
  <package id="System.Threading.ThreadPool" version="4.3.0" targetFramework="net48" />
  <package id="System.ValueTuple" version="4.5.0" targetFramework="net48" />
  <package id="System.Xml.ReaderWriter" version="4.3.1" targetFramework="net48" />
</packages>

订阅者启动后,Exchanges 和 Queues 创建成功,但仍然无法推送消息。请帮帮我。

【问题讨论】:

    标签: c# rabbitmq masstransit


    【解决方案1】:

    您可能需要从学习 async/await 在 C# 中的工作原理开始。你期望一个方法,它返回一个Task 来做某事而不等待它,它永远不会发生。除非等待,否则任务永远不会开始。

    制作人:

    namespace Publisher
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
                {
                    sbc.Host(new Uri("rabbitmq://localhost"), h =>
                    {
                        h.Username("guest");
                        h.Password("guest");
                    });
                });
                await bus.Start();
                await Task.WhenAll(Enumerable.Range(0, 100).Select(i =>
                {
                    var message = new ValueEntered { Value = $"Message {i}" };
                    return bus.Publish(message);
                }));
                await bus.Stop();
            }
        }
    }
    

    消费者:

    namespace Subscriber
    {
        class Program
        {
            public static async Task Main(string[] args)
            {
                var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
                {
                    cfg.Host(new Uri("rabbitmq://localhost"), h =>
                    {
                        h.Username("guest");
                        h.Password("guest");
                    });
    
                    cfg.ReceiveEndpoint("value-events-listener", e =>
                    {
                        e.Consumer<ValueConsumer>();
                    });
                });
                await bus.Start();
                Console.WriteLine("Waiting for messages...");
                Console.ReadLine();
                await bus.Stop();
            }
        }
    
        public class ValueConsumer : IConsumer<ValueEntered>
        {
            public Task Consume(ConsumeContext<ValueEntered> context)
            {
                Console.WriteLine($"Received [x] {JsonConvert.SerializeObject(context)} [at] {DateTime.Now:u}");
                return Task.CompletedTask;
            }
        }
    }
    

    【讨论】:

    • 非常感谢。完全错过了我使用 async void。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-21
    相关资源
    最近更新 更多