【问题标题】:Azure Service Bus - Readd message in OnMessageAsync to the end of the queueAzure 服务总线 - 将 OnMessageAsync 中的消息读取到队列末尾
【发布时间】:2016-03-25 21:21:10
【问题描述】:

我们正在使用 Microsoft Azure 服务总线来发送命令消息,并且我们正在使用 OnMessage 方法从总线中获取它们。在我们的 IMessageSessionAsyncHandler.OnMessageAsync 实现中,我们可能意识到消息尚未准备好进行处理。所以我想从总线上获取消息并将它们读到队列的末尾。 get/readd 操作必须是原子的。我怎样才能做到这一点?

这是我目前的解决方案(高度抽象),但我担心非原子性。

queueClient.RegisterSessionHandlerFactory(new CommandSessionHandlerFactory(queueClient), ...);

internal class CommandSessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
    private readonly QueueClient _queueClient;
    public CommandSessionHandlerFactory(QueueClient queueClient)
    {
        _queueClient = queueClient;
    }

    public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
    {
        return new CommandSessionHandlerAsync(_queueClient);
    }
}

internal class CommandSessionHandlerAsync : MessageSessionAsyncHandler
{
    private readonly QueueClient _queueClient;
    public CommandSessionHandlerAsync(QueueClient queueClient)
    {
        _queueClient = queueClient;
    }

    protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
    {
        if (!messageReadyForProcessing)
        {
            // How to get the following code transactional safe?
            var clonedMessage = message.Clone();
            await message.CompleteAsync();
            await _queueClient.SendAsync(clonedMessage);
        }
    }
}

那么重复检测呢?我们是否必须更改克隆消息的 MessageId 以确保服务总线重复检测不会丢弃克隆消息?

【问题讨论】:

    标签: azureservicebus


    【解决方案1】:

    那么重复检测呢?我们是否必须更改 MessageId 克隆消息以确保服务总线重复 检测不会丢弃克隆的消息?

    如果启用了重复检测。您的消息将从队列中删除。因为您使用相同的 MessageId 并且重复检测会在给定时间段内跟踪 messageId(默认为 10 分钟)。

          if (!messageReadyForProcessing)
            {
                // How to get the following code transactional safe?
                var clonedMessage = message.Clone();
                await message.CompleteAsync();
                await _queueClient.SendAsync(clonedMessage);
            }
    

    你可以改成这样:

             if (!messageReadyForProcessing)
                {
                    await message.CompleteAsync();
                    message.MessageId = Guid.NewGuid().ToString();
                    await _queueClient.SendAsync(message);
                }
    

    但是还是有问题。如果消息成功完成,但发送消息失败怎么办?您将丢失消息。重试策略可以解决这个问题。但是它很脏,不能 100% 保证。

    您可以增加 Max DeliveryCount。除了再次发送消息之外,只需放弃(释放)它以再次使用。

            if (!messageReadyForProcessing)
                {
                    await message.AbandonAsyncy();
    
                }
    

    这样更好。你会确定,它是事务性的。即使超过最大投递次数,也会进入死队列。

    但还是有一个缺点。真实发送的消息会发生什么?因此,如果您有一条永远不会被处理的消息,它将定位您的消费者。因为我们增加了最大交付数量。

    简而言之:OnMessage 不适合您的解决方案。当您准备好处理它时,只需从队列中获取消息。我认为这是最适合您的解决方案。

       if (messageReadyForProcessing)
         {
            var mySession=QueueClient.AcceptMessageSession();
                 var message = mySession.Receive();
         }
    

    编辑:

    您可以使用TransactionScope with service bus。您应该在范围内的同一队列上工作。

    Ps:事务范围不支持放弃。

    所以你可以应用这个:

        if (!messageReadyForProcessing)
          {
             using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
                { 
                        await message.CompleteAsync();
                        message.MessageId = Guid.NewGuid().ToString();
                        await _queueClient.SendAsync(message);
              scope.Complete();                
          }
         }
    

    查看Brokered Messaging: Transactions

    我不确定这是否适用于异步。所以你可以测试和检查Asynchronous Transactions with Service Bus,如果它不成功。

    Ps:他们说网络框架上有一个关于 transactionscope 和 asyncs 的错误。他们建议您使用 4.5.1 或更高版本。更多信息请查看here

    【讨论】:

    • 1) Abandon 不适合我们,因为我们确实必须将消息 X 放在队列的末尾,因为在消息 X 之前必须处理其他消息。 2) messageReadyForProcessing 是一个决定基于会话。因此,很遗憾,您的最后一个建议也不是我们可以采用的方式,因为我们已经在开会了。
    • 我看到我找不到明确的答案,放弃发送到结束或开始队列后会发生什么。但我已经编辑了我的答案,你可以使用事务范围。
    • 非常感谢。我已阅读有关 TransactionScope 的信息,但不确定它是否适用于我们的案例。关于您对“异步”的疑问:它似乎从 .NET Framework 4.5.1 开始工作。 TransactionScope 有一个新的构造函数,称为 TransactionScopeAsyncFlowOption:stackoverflow.com/questions/13543254/… 也许您会将参数添加到示例中。
    • 我已编辑。欢迎您@Tobias,我在回答时也学到了新东西。
    猜你喜欢
    • 1970-01-01
    • 2021-08-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-04-22
    • 1970-01-01
    • 1970-01-01
    • 2014-11-03
    相关资源
    最近更新 更多