【问题标题】:Throttling Azure Storage Queue processing in Azure Function App在 Azure Function App 中限制 Azure 存储队列处理
【发布时间】:2018-08-28 09:49:41
【问题描述】:

我创建了一个带有 Azure 存储队列触发器的 Azure 函数应用程序,该触发器处理一个队列,其中每个队列项都是一个 URL。该函数只是下载 URL 的内容。我有另一个函数可以加载和解析站点的 XML 站点地图并将所有页面 URL 添加到队列中。我遇到的问题是 Functions 应用程序运行速度太快,它会影响网站,因此它开始返回服务器错误。有没有办法限制/限制 Functions 应用的运行速度?

当然,我可以编写一个简单的 Web 作业来串行处理它们(或者使用一些异步但限制并发请求的数量),但我真的很喜欢 Azure Functions 的简单性,并想尝试“无服务器”计算.

【问题讨论】:

    标签: azure azure-functions


    【解决方案1】:

    您可以考虑几个选项。

    首先,您可以在host.json 中配置一些控制队列处理的旋钮(记录在here)。 queues.batchSize 旋钮是一次获取多少队列消息。如果设置为 1,则运行时将一次获取 1 条消息,并且仅在该消息的处理完成时才获取下一条消息。这可以在单个实例上为您提供一定程度的序列化。

    另一个选项可能是您为排队的消息设置 NextVisibleTime,以使它们间隔开 - 默认情况下,排队的消息变得可见并准备好立即处理。

    最后一个选项可能是让您将包含站点所有 URL 集合的消息排入队列,而不是一次一个,因此当处理消息时,您可以在函数中连续处理 URL,并且以这种方式限制并行度。

    【讨论】:

    • 谢谢@mathewc。我接受了您的第二个建议(设置 NextVisibleTime)。我花了一点时间才弄清楚如何做到这一点,但它运作良好。我的代码在github.com/alindgren/SiteWarmer,以防有人想偷看。
    • queues.batchSize 是一次获取的队列消息总数吗?还是仅针对该函数的每个实例?
    • BatchSize 是在我们的队列轮询循环的每次迭代中获取的消息数。 IE。这是我们传递给 Azure Queues GetMessagesAsync 调用的数字。这个轮询循环特定于一个函数 - 每个队列函数都有自己的。
    • host.json 文档已更改为此处:docs.microsoft.com/en-us/azure/azure-functions/…
    • 嗨@mathewc,您对事件中心绑定有什么建议?使用 IoT->eHub->Func。我有历史数据馈送杀死我的连接数。我可以使用 max batch=1 的 IoT->sBus->Func,但这不会触发新实例吗?那么最多 200 个 func 实例?
    【解决方案2】:
    如果有多个并行函数添加到队列中,

    NextVisibleTime 可能会变得混乱。对于遇到此问题的任何人来说,另一个简单的选择: 创建另一个队列“节流项”,并让您的原始函数跟随队列触发器。然后,添加一个简单的计时器函数,每分钟从原始队列中移动消息,并相应地间隔 NextVisibleTime

        [FunctionName("ThrottleQueueItems")]
        public static async Task Run([TimerTrigger("0 * * * * *")] TimerInfo timer, ILogger logger)
        {
            var originalQueue = // get original queue here;
            var throttledQueue = // get throttled queue here;
            var itemsPerMinute = 60; // get from app settings
            var individualDelay = 60.0 / itemsPerMinute;
            var totalRetrieved = 0;
            var maxItemsInBatch = 32; // change if you modify the default queue config
            do
            {
                var pending = (await originalQueue.GetMessagesAsync(Math.Min(maxItemsInBatch, itemsPerMinute - totalRetrieved))).ToArray();
                if (!pending.Any())
                    break;
                foreach (var message in pending)
                {
                    await throttledQueue.AddMessageAsync(new CloudQueueMessage(message.AsString), null,
                                                                                            TimeSpan.FromSeconds(individualDelay * ++totalRetrieved), null, null);
                    await originalQueue.DeleteMessageAsync(message);
                }
            } while (itemsPerMinute > totalRetrieved);
        }
    

    【讨论】:

      【解决方案3】:

      我在尝试解决类似问题时发现了这篇文章。这可能对到达这里的任何人都有用。您现在可以使用 WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT 应用设置来限制函数的并发实例数。将此设置为 1 并结合批处理限制 1 将允许您执行队列的串行处理。

      WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT

      函数应用可以横向扩展的最大实例数。默认没有限制。

      https://docs.microsoft.com/en-gb/azure/azure-functions/functions-app-settings#website_max_dynamic_application_scale_out

      【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-12-26
      • 2015-05-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-10-09
      • 2019-07-12
      相关资源
      最近更新 更多