【问题标题】:MassTransit Losing Messages - Rabbit MQ - When publisher and consumer endpoint names are the same,MassTransit 丢失消息 - Rabbit MQ - 当发布者和消费者端点名称相同时,
【发布时间】:2012-09-17 11:22:13
【问题描述】:

如果您使用相同的端点名称创建发布者和消费者,我们遇到了 MassTransit 丢失消息的情况。

注意下面的代码;如果我为消费者或发布者使用不同的端点名称(例如,发布者使用“rabbitmq://localhost/mtlossPublised”),那么消息将同时计算发布和消费匹配;如果我使用相同的端点名称(如示例中),那么我得到的消息消耗比发布的少。

这是预期的行为吗?还是我做错了什么,下面的工作示例代码。

using MassTransit;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace MTMessageLoss
{
    class Program
    {
        static void Main(string[] args)
        {
            var consumerBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            var publisherBus = ServiceBusFactory.New(b =>
            {
                b.UseRabbitMq();
                b.UseRabbitMqRouting();
                b.ReceiveFrom("rabbitmq://localhost/mtloss");
            });
            consumerBus.SubscribeConsumer(() => new MessageConsumer());
            for (int i = 0; i < 10; i++)
                publisherBus.Publish(new SimpleMessage() { CorrelationId = Guid.NewGuid(), Message = string.Format("This is message {0}", i) });
            Console.WriteLine("Press ENTER Key to see how many you consumed");
            Console.ReadLine();
            Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.", MessageConsumer.Count);
            Console.ReadLine();
            consumerBus.Dispose();
            publisherBus.Dispose();
        }
    }
    public interface ISimpleMessage : CorrelatedBy<Guid>
    {
        string Message { get; }
    }
    public class SimpleMessage : ISimpleMessage
    {
        public Guid CorrelationId { get; set; }
        public string Message { get; set; }
    }
    public class MessageConsumer : Consumes<ISimpleMessage>.All
    {
        public static int Count = 0;
        public void Consume(ISimpleMessage message)
        {
            System.Threading.Interlocked.Increment(ref Count);
        }
    }
}

【问题讨论】:

    标签: rabbitmq masstransit


    【解决方案1】:

    归根结底,总线的每个实例都需要它自己的队列来读取。即使总线只是为了发布消息而存在。这只是 MassTransit 工作方式的一个要求。

    http://masstransit.readthedocs.org/en/master/configuration/config_api.html#basic-options - 查看警告。

    当两个总线实例共享同一个队列时,我们将行为保留为未定义。无论如何,这不是我们支持的条件。每个总线实例都可以将元数据发送到其他总线实例,并且需要它自己的端点。这对 MSMQ 来说是一个更大的交易,所以也许我们可以让这个案例在 RabbitMQ 上工作——但这​​并不是我们在这一点上花费太多心思的事情。

    【讨论】:

    • 特拉维斯,二进制忧虑者。感谢你们两位的投入。感谢 Travis 文档的链接;我以前没见过。我以为我已经阅读了网站上的所有文档。所有这些显然都没有陷入:)
    • @Bigtoe 有很多东西要吸收,如果您对如何通过更明确的方式使您第一次错过的任何内容有任何想法,我们很高兴听到您的 cmets。任何能让人们更轻松地访问 MT 的东西都很棒。
    • 链接已失效
    【解决方案2】:

    发生的情况是,在提供相同的 Receiver Uri 时,您是在告诉 MT 对两条总线上的消耗进行负载平衡,但是您只有一个总线在监听消息。

    如果你让它跟踪收到了哪些条消息,你会看到它(几乎)每秒都有一个。

    我已经调整了你的示例代码

    We consumed 6 simple messages. Press Enter to terminate the applicaion.
    Received 0
    Received 3
    Received 5
    Received 6
    Received 7
    Received 8
    

    在另一辆公共汽车上启动一个消费者,你会得到他们所有的

    We consumed 10 simple messages. Press Enter to terminate the applicaion.
    Received 0
    Received 1
    Received 2
    Received 3
    Received 4
    Received 5
    Received 6
    Received 7
    Received 8
    Received 9
    

    是的,我认为这是预期的行为。

    这是两个订阅者的调整示例代码

    using MassTransit;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace MTMessageLoss
    {
        class Program
        {
            internal static bool[] msgReceived = new bool[10];
            static void Main(string[] args)
            {
                var consumerBus = ServiceBusFactory.New(b =>
                    {
                        b.UseRabbitMq();
                        b.UseRabbitMqRouting();
                        b.ReceiveFrom("rabbitmq://localhost/mtloss");
                    });
                var publisherBus = ServiceBusFactory.New(b =>
                    {
                        b.UseRabbitMq();
                        b.UseRabbitMqRouting();
                        b.ReceiveFrom("rabbitmq://localhost/mtloss");
                    });
                publisherBus.SubscribeConsumer(() => new MessageConsumer());
                consumerBus.SubscribeConsumer(() => new MessageConsumer());
                for (int i = 0; i < 10; i++)
                    consumerBus.Publish(new SimpleMessage()
                        {CorrelationId = Guid.NewGuid(), MsgId = i});
                Console.WriteLine("Press ENTER Key to see how many you consumed");
                Console.ReadLine();
                Console.WriteLine("We consumed {0} simple messages. Press Enter to terminate the applicaion.",
                                  MessageConsumer.Count);
                for (int i = 0; i < 10; i++)
                    if (msgReceived[i])
                        Console.WriteLine("Received {0}", i);
                Console.ReadLine();
                consumerBus.Dispose();
                publisherBus.Dispose();
    
            }
        }
        public interface ISimpleMessage : CorrelatedBy<Guid>
        {
            int MsgId { get; }
        }
        public class SimpleMessage : ISimpleMessage
        {
            public Guid CorrelationId { get; set; }
            public int MsgId { get; set; }
        }
        public class MessageConsumer : Consumes<ISimpleMessage>.All
        {
            public static int Count = 0;
            public void Consume(ISimpleMessage message)
            {
                Program.msgReceived[message.MsgId] = true;
                System.Threading.Interlocked.Increment(ref Count);
            }
        }
    }
    

    【讨论】:

    • OK 这确实在一定程度上解释了它;但是发布者总线上的原始样本中没有订阅;所以无论如何它们永远不会被发布者总线消耗,所以为什么要进行负载平衡。另外,我不确定您的解释是否成立;因为这些消息不应该丢失;它们应该一直被消耗掉。在我们的示例中,只有一个消费者;端点配置不应决定负载平衡;应该由注册的消费者来决定。
    • 与消费相关联的不是总线实例,而是接收器 URI(这是 MT 首先为您提供跨服务负载平衡的方式)。对我来说,MT 会看到这种配置并说“啊,我有同一总线的两个实例(相同的 Uri),我被告知总线的 type 消耗消息 x , ergo 我将负载平衡它们之间的消息”。 tl;dr 如果您不想对两条总线进行负载平衡,请给它们不同的名称
    • 为了更具指导性,如果您跟踪示例中哪个消费者消费了哪条消息,但不确定哪些消息被消费了,这可能会更好。但无论如何,您都会看到两个消费者竞争消息。
    • @BinaryWorrier 是的,最好在总线初始化时注册消费者。虽然您可以动态注册它们,但您可能会丢失一些已经在队列中的消息。这就是我们在配置器中支持它的原因。
    • @mikebridge 我不这么认为。我仍在使用旧版本,我认为您需要显式注册消费者才能获得 UnsubscribeAction。您可以将 *?temporary=true 作为队列名称来创建临时文件。不过为那个消费者排队。
    猜你喜欢
    • 2023-03-06
    • 1970-01-01
    • 2016-10-25
    • 2019-07-12
    • 1970-01-01
    • 1970-01-01
    • 2013-12-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多