【问题标题】:How to avoid race condition when updating Azure Table Storage record更新 Azure 表存储记录时如何避免竞争条件
【发布时间】:2019-10-23 19:29:21
【问题描述】:

利用 Azure 表存储的 Azure 函数

我有一个 Azure 函数,它是从 Azure 服务总线主题订阅触发的,我们称之为“处理文件信息”函数。

订阅的消息包含要处理的文件信息。类似的东西:

{
  "uniqueFileId": "adjsdakajksajkskjdasd",
  "fileName":"mydocument.docx",
  "sourceSystemRef":"System1",
  "sizeBytes": 1024,
  ... and other data
}

该函数执行以下两个操作 -

  1. 检查单个文件存储表是否存在文件。如果存在,请更新该文件。如果是新文件,请将文件添加到存储表(按每个系统|每个 fileId 存储)。

  2. 捕获文件大小字节的指标并存储在第二个存储表中,称为指标(不断增加字节,存储在每个系统|每年/每月 基础)。

下图简要总结了我的方法:

individualFileInfo 表和 fileMetric 表的区别在于,单个表每个文件有一条记录,而 metric 表每月存储一条记录,并且不断更新(增量)收集通过函数传递的总字节数。

fileMetrics 表中的数据存储方式如下:

问题...

Azure 函数在扩展方面非常出色,在我的设置中,我最多可以同时运行 6 个这些函数。假设要处理的每个文件消息都是唯一的 - 更新 individualFileInfo 表中的记录(或插入)可以正常工作,因为没有竞争条件。

然而,更新 fileMetric 表被证明是有问题的,因为所有 6 个函数同时触发,它们都打算同时更新指标表(不断增加新文件计数器或增加现有文件计数器)。

我已尝试使用 etag 进行乐观更新,并在存储更新返回 412 响应时使用一点递归来重试(下面的代码示例)。但我似乎无法避免这种竞争条件。有没有人对如何解决这个限制或以前遇到类似的问题有任何建议?

在存储 fileMetric 更新的函数中执行的示例代码:

internal static async Task UpdateMetricEntry(IAzureTableStorageService auditTableService, 
    string sourceSystemReference, long addNewBytes, long addIncrementBytes, int retryDepth = 0)
{
    const int maxRetryDepth = 3; // only recurively attempt max 3 times
    var todayYearMonth = DateTime.Now.ToString("yyyyMM");
    try
    {
        // Attempt to get existing record from table storage.
        var result = await auditTableService.GetRecord<VolumeMetric>("VolumeMetrics", sourceSystemReference, todayYearMonth);

        // If the volume metrics table existing in storage - add or edit the records as required.
        if (result.TableExists)
        {
            VolumeMetric volumeMetric = result.RecordExists ?
                // Existing metric record.
                (VolumeMetric)result.Record.Clone()
                    :
                // Brand new metrics record.
                new VolumeMetric
                {
                    PartitionKey = sourceSystemReference,
                    RowKey = todayYearMonth,
                    SourceSystemReference = sourceSystemReference,
                    BillingMonth = DateTime.Now.Month,
                    BillingYear = DateTime.Now.Year,
                    ETag = "*"
                };

            volumeMetric.NewVolumeBytes += addNewBytes;
            volumeMetric.IncrementalVolumeBytes += addIncrementBytes;

            await auditTableService.InsertOrReplace("VolumeMetrics", volumeMetric);
        }
    }
    catch (StorageException ex)
    {
        if (ex.RequestInformation.HttpStatusCode == 412)
        {
            // Retry to update the volume metrics.
            if (retryDepth < maxRetryDepth)
                await UpdateMetricEntry(auditTableService, sourceSystemReference, addNewBytes, addIncrementBytes, retryDepth++);
        }
        else
            throw;
    }
}

Etag 会跟踪冲突,如果此代码收到 412 Http 响应,它将重试,最多 3 次(试图缓解问题)。我的问题是我不能保证在函数的所有实例中更新表存储。

提前感谢您的任何提示!

【问题讨论】:

    标签: azure azure-functions azureservicebus azure-table-storage race-condition


    【解决方案1】:

    您可以将工作的第二部分放入第二个队列和函数中,甚至可以在文件更新时触发。

    由于其他操作听起来可能需要大部分时间,所以它也可以消除第二步的一些热量。

    然后,您可以通过仅关注该函数来解决任何剩余的竞争条件。您可以使用会话来有效地限制并发。在您的情况下,系统 ID 可能是一个可能的会话密钥。如果你使用它,你将只有一个 Azure Function 一次处理来自一个系统的数据,有效地解决你的竞争条件。

    https://dev.to/azure/ordered-queue-processing-in-azure-functions-4h6c

    编辑:如果您不能使用 Sessions 逻辑锁定资源,则可以通过 Blob 存储使用锁定:

    https://www.azurefromthetrenches.com/acquiring-locks-on-table-storage/

    【讨论】:

    • 非常感谢您的建议 - 这是并行有序处理服务总线消息的一个很好的解决方案。但是,我真正的问题是并行尝试写入表存储。
    • 如果您控制了 Azure 函数中并行性发生的顺序,您将有效地控制表存储的并行尝试。您只需要找到一个会话密钥来控制您想要的内容,而不会过多地限制吞吐量(例如 systemid?)。如果你不能这样做:为不支持它们的资源引入锁的常用方法是 blob 存储,即azurefromthetrenches.com/acquiring-locks-on-table-storage
    猜你喜欢
    • 2019-06-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-01-30
    • 2010-09-25
    • 1970-01-01
    • 2016-09-20
    相关资源
    最近更新 更多