【发布时间】:2019-10-23 19:29:21
【问题描述】:
利用 Azure 表存储的 Azure 函数
我有一个 Azure 函数,它是从 Azure 服务总线主题订阅触发的,我们称之为“处理文件信息”函数。
订阅的消息包含要处理的文件信息。类似的东西:
{
"uniqueFileId": "adjsdakajksajkskjdasd",
"fileName":"mydocument.docx",
"sourceSystemRef":"System1",
"sizeBytes": 1024,
... and other data
}
该函数执行以下两个操作 -
检查单个文件存储表是否存在文件。如果存在,请更新该文件。如果是新文件,请将文件添加到存储表(按每个系统|每个 fileId 存储)。
捕获文件大小字节的指标并存储在第二个存储表中,称为指标(不断增加字节,存储在每个系统|每年/每月 基础)。
下图简要总结了我的方法:
individualFileInfo 表和 fileMetric 表的区别在于,单个表每个文件有一条记录,而 metric 表每月存储一条记录,并且不断更新(增量)收集通过函数传递的总字节数。
fileMetrics 表中的数据存储方式如下:
问题...
Azure 函数在扩展方面非常出色,在我的设置中,我最多可以同时运行 6 个这些函数。假设要处理的每个文件消息都是唯一的 - 更新 individualFileInfo 表中的记录(或插入)可以正常工作,因为没有竞争条件。
然而,更新 fileMetric 表被证明是有问题的,因为所有 6 个函数同时触发,它们都打算同时更新指标表(不断增加新文件计数器或增加现有文件计数器)。
我已尝试使用 etag 进行乐观更新,并在存储更新返回 412 响应时使用一点递归来重试(下面的代码示例)。但我似乎无法避免这种竞争条件。有没有人对如何解决这个限制或以前遇到类似的问题有任何建议?
在存储 fileMetric 更新的函数中执行的示例代码:
internal static async Task UpdateMetricEntry(IAzureTableStorageService auditTableService,
string sourceSystemReference, long addNewBytes, long addIncrementBytes, int retryDepth = 0)
{
const int maxRetryDepth = 3; // only recurively attempt max 3 times
var todayYearMonth = DateTime.Now.ToString("yyyyMM");
try
{
// Attempt to get existing record from table storage.
var result = await auditTableService.GetRecord<VolumeMetric>("VolumeMetrics", sourceSystemReference, todayYearMonth);
// If the volume metrics table existing in storage - add or edit the records as required.
if (result.TableExists)
{
VolumeMetric volumeMetric = result.RecordExists ?
// Existing metric record.
(VolumeMetric)result.Record.Clone()
:
// Brand new metrics record.
new VolumeMetric
{
PartitionKey = sourceSystemReference,
RowKey = todayYearMonth,
SourceSystemReference = sourceSystemReference,
BillingMonth = DateTime.Now.Month,
BillingYear = DateTime.Now.Year,
ETag = "*"
};
volumeMetric.NewVolumeBytes += addNewBytes;
volumeMetric.IncrementalVolumeBytes += addIncrementBytes;
await auditTableService.InsertOrReplace("VolumeMetrics", volumeMetric);
}
}
catch (StorageException ex)
{
if (ex.RequestInformation.HttpStatusCode == 412)
{
// Retry to update the volume metrics.
if (retryDepth < maxRetryDepth)
await UpdateMetricEntry(auditTableService, sourceSystemReference, addNewBytes, addIncrementBytes, retryDepth++);
}
else
throw;
}
}
Etag 会跟踪冲突,如果此代码收到 412 Http 响应,它将重试,最多 3 次(试图缓解问题)。我的问题是我不能保证在函数的所有实例中更新表存储。
提前感谢您的任何提示!
【问题讨论】:
标签: azure azure-functions azureservicebus azure-table-storage race-condition