【问题标题】:Control Azure Service Bus Queue Message Reception控制 Azure 服务总线队列消息接收
【发布时间】:2020-07-30 10:48:50
【问题描述】:

我们有一个分布式架构,并且有一个需要调用的本机系统。挑战在于系统的容量不可扩展并且不能同时承担更多的请求负载。我们已经实现了服务总线队列,其中有一个消息处理程序侦听此队列并调用本机系统。当前的挑战是无论何时在队列中发布消息,消息处理程序都会立即处理该请求。但是,我们希望有一个场景一次只处理两个请求。选择两个,处理它,然后继续接下来的两个。服务总线队列是否提供了内置选项来控制这一点,还是我们只能使用自定义逻辑?

var options = new MessageHandlerOptions()
            {
                MaxConcurrentCalls = 1,
                AutoComplete = false
            
            };
        client.RegisterMessageHandler(
            async (message, cancellationToken) =>
            {
                try
                {
                    //Handler to process
                    await client.CompleteAsync(message.SystemProperties.LockToken);
                }
                catch
                {
                    await client.AbandonAsync(message.SystemProperties.LockToken);
                }

            }, options);

【问题讨论】:

    标签: azure azure-servicebus-queues


    【解决方案1】:

    Message Handler API 是为并发而设计的。如果您想在任何给定时间点处理两条消息,那么最大并发数为两条的 Handler API 将是您的答案。如果您需要在任何给定时间点处理一批两条消息,则此 API 不是您所需要的。相反,回退到使用 Mikolaj 提供的答案中概述的较低级别的 API 构建自己的消息泵。

    但要小心重新锁定消息。这不是保证操作,因为它是客户端操作,如果有通信网络,当前,代理将重置锁,如果您向外扩展,消息将由另一个竞争消费者再次处理。这就是为什么在您的场景中向外扩展可能会是一个挑战。

    另外一点是关于MessageReceiver 的较低级别的 API,当涉及到接收多条消息时 - ReceiveAsync(n) 确实保证 n 消息将被检索。如果您绝对必须拥有n 消息,则需要循环以确保有n 且不少于。

    关于管理客户端和获取队列消息计数的最后一点 - 强烈建议不要这样做。管理客户端不适合在运行时频繁使用。相反,它用于偶尔调用,因为这些调用非常慢。鉴于您最终可能会得到一个处理端点,一次只能处理两条消息(甚至不是每秒),这些调用将增加总处理时间。

    【讨论】:

    • 我正在考虑使用 MessageReceiver 并使用 ReceiveAsync(2),您的意思是 List 的计数可能大于两个吗?获得小于 2 的数字仍然可以,但有趣的是 ReceiveAsync(n) 并不能保证该计数。
    • 它总是小于 N。永远不会大于。
    • 谢谢@Sean Feldman 我能够控制我的消息流入,但还有另一个问题。具有消息处理程序的 API 附加到应用程序洞察力。我能够查询并看到每次只处理 2 个消息,但是这两个进程都具有相同的应用程序洞察 Operation_Id。我正在尝试如何为单独的消息获取不同的操作 ID。我的一个解决方法是,app insight operation_parent id 不同,这有助于我根据流查找日志,但我的目标是每条消息的操作 id 都不同。有什么想法吗?
    • 鉴于您使用单个操作来检索两条消息,因此它们具有相同的操作 ID 是有意义的。不完全确定 AppInsights 使用的背景是什么。因此,不能真正提出太多建议。您可能会尝试的一件事是,在完成消息之前为每个消息设置一个具有不同值的自定义属性,并查看 AppInsights 是否将自定义属性包含在元数据集合中。
    • 我已经有一个解决方法,虽然操作 ID 相同,但操作父 ID 不同,这有帮助。我正在考虑实施,看看 [ServiceBusQueueTrigger] SDK 实施是否有帮助,谢谢 Sean。
    【解决方案2】:

    从我的头上看,我不认为开箱即支持这样的东西,所以你最好自己做。

    我建议您查看 ReceiveAsync() 方法,该方法允许您接收特定数量的消息(注意:我认为它不能保证如果您指定要检索 2 条消息,它总会得到您二。例如,如果队列中只有一条消息,那么即使您要求两条,它也可能会返回该消息)

    您可以将ReceiveAsync() 方法与PeekAsync() 方法结合使用,您还可以在其中提供许多您想要查看的消息。如果查看的消息数量为 2,则您可以致电 ReceiveAsync(),从而更有可能获得所需的两条消息。

    另一种方法是查看队列的ManagementClientGetQueueRuntimeInfoAsync() 方法,这将为您提供有关队列中消息数量的信息。有了这些信息,您就可以拨打前面提到的ReceiveAsync()

    但是,请注意,如果您有多个接收者在侦听同一个队列,则无法保证上面的任何内容都可以正常工作,因为无法确定这些消息是否被另一个进程接收。

    您可能需要采用更复杂的方式来处理此问题并接收一条消息,然后使其保持活动状态(更新锁定等),直到您收到另一条消息,然后将它们一起处理。

    我不认为我提供了太多帮助,但至少它可能会给你一些想法。

    【讨论】:

    • 谢谢 Mikolaj,让我试试:)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-04-16
    • 1970-01-01
    • 2017-02-13
    • 1970-01-01
    • 1970-01-01
    • 2014-11-03
    • 2017-11-01
    相关资源
    最近更新 更多