【问题标题】:How to ensure message reception order in MassTransitMassTransit中如何保证消息接收顺序
【发布时间】:2012-05-22 18:18:09
【问题描述】:

我有一个 saga 有 3 个状态;初始、接收行、已完成 -

    public static State Initial { get; set; }
    public static State ReceivingRows { get; set; }
    public static State Completed { get; set; }

当它收到 BofMessage(其中 Bof = 文件开头)时,它会从 Initial 转换为 ReceivingRows。在 BofMessage 之后,它会接收大量 RowMessage,其中每个都描述平面文件中的一行。发送完所有 RowMessage 后,将发送 EofMessage 并且状态更改为 Completed。观察 -

    static void DefineSagaBehavior()
    {
        Initially(When(ReceivedBof)
            .Then((saga, message) => saga.BeginFile(message))
            .TransitionTo(ReceivingRows));

        During(ReceivingRows, When(ReceivedRow)
            .Then((saga, message) => saga.AddRow(message)));

        During(ReceivingRows, When(ReceivedRowError)
            .Then((saga, message) => saga.RowError(message)));

        During(ReceivingRows, When(ReceivedEof)
            .Then((saga, message) => saga.EndFile(message))
            .TransitionTo(Completed));
    }

这可行,除了有时在 BofMessage 之前收到几个 RowMessage!这与我发送给他们的顺序无关。这意味着这些消息将被接收并最终计为错误,导致它们从我最终将它们写入的数据库或文件中丢失。

作为一个临时修复,我在这个方法中添加了一个小睡眠计时器技巧来完成所有的发布工作 –

    public static void Publish(
        [NotNull] IServiceBus serviceBus,
        [NotNull] string publisherName,
        Guid correlationId,
        [NotNull] Tuple<string, string> inputFileDescriptor,
        [NotNull] string outputFileName)
    {
        // attempt to load offsets
        var offsetsResult = OffsetParser.Parse(inputFileDescriptor.Item1);
        if (offsetsResult.Result != ParseOffsetsResult.Success)
        {
            // publish an offsets invalid message
            serviceBus.Publish<TErrorMessage>(CombGuid.Generate(), publisherName, inputFileDescriptor.Item2);
            return;
        }

        // publish beginning of file
        var fullInputFilePath = Path.GetFullPath(inputFileDescriptor.Item2);
        serviceBus.Publish<TBofMessage>(correlationId, publisherName, fullInputFilePath);

        // HACK: make sure bof message happens before row messages, or else some row messages won't be received
        Thread.Sleep(5000);

        // publish rows from feed
        var feedResult = FeedParser.Parse(inputFileDescriptor.Item2, offsetsResult.Offsets);
        foreach (var row in feedResult)
        {
            // publish row message, unaligned if applicable
            if (row.Result != ParseRowResult.Success)
                serviceBus.Publish<TRowErrorMessage>(correlationId, publisherName, row.Fields);
            else
                serviceBus.Publish<TRowMessage>(correlationId, publisherName, row.Fields);
        }

        // publish end of file
        serviceBus.Publish<TEofMessage>(correlationId, publisherName, outputFileName);
    }

这是一个 5 秒的睡眠定时器,而且是相当丑陋的 hack。谁能告诉我为什么我没有按发送顺序收到消息?如果这些消息默认是无序的,我能否确保它们以正确的顺序发送?

谢谢!

为了方便起见,请注意这是从http://groups.google.com/group/masstransit-discuss/browse_thread/thread/7bd9518a690db4bb 交叉发布的。

【问题讨论】:

    标签: c# rabbitmq masstransit


    【解决方案1】:

    您无法确保消息按任何顺序传递。您可以通过确保消费者端只有一个并发消费者来接近 MT,我仍然不会依赖这种行为 (http://docs.masstransit-project.com/en/latest/overview/keyideas.html#handlers)。这将有效地使您的消费者单线程。

    【讨论】:

      猜你喜欢
      • 2023-03-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-12-13
      • 1970-01-01
      • 2011-06-14
      • 1970-01-01
      相关资源
      最近更新 更多