【发布时间】:2014-03-03 11:27:20
【问题描述】:
我正在读取 azure 表的数据 - 大约 5k 个表并收集不同的指标并将它们保存回其他一些 azure 表,一切都以异步方式进行。我面临的问题是当偶尔发生大量数据时,应用程序开始挂起。相同的代码在数据较少的情况下也能正常工作。我正在做的步骤(所有这些都是异步使用 Rx、async 和 await)是
- 从 Azure 中读取所有表名
- 读取所有表格之前的指标数据(1 和 2 是并行的 -
Task.WhenAll) - 从每个表中获取数据,处理并保存 (
Task.WhenAll)
我想要的是,使用异步直到它不会使我的应用程序挂起。如果数据多于可以处理的数据,则不应再读取任何表的数据,而是专注于完成可用的数据处理。
Parallel.ForEach 会解决这个问题吗?
代码:已根据 Stephen Cleary 进行编辑,但仍不适用于所有表格。而它适用于 500 个表,
我认为是数据量使应用程序(控制台应用程序)停止运行,而不是线程数。 (一个线程最终可能会检索数百万行,以千为单位,每千行传递给一个方法,并将其计数添加到字典中,因此可以在需要更多内存时进行垃圾收集)还是我实现的方式@ 987654325@错了吗?
public async Task CalculateMetricsForAllTablesAsync()
{
var allWizardTableNamesTask = GetAllWizardTableNamesAsync();
var allTablesNamesWithLastRunTimeTask = GetAllTableNamesWithLastRunTimeAsync();
await Task.WhenAll(allWizardTableNamesTask, allTablesNamesWithLastRunTimeTask).ConfigureAwait(false);
var allWizardTableNames = allWizardTableNamesTask.Result;
var allTablesNamesWithLastRunTime = allTablesNamesWithLastRunTimeTask.Result;
var throttler = new SemaphoreSlim(10);
var concurrentTableProcessingTasks = new ConcurrentStack<Task>();
foreach (var tname in allWizardTableNames)
{
await throttler.WaitAsync();
try
{
concurrentTableProcessingTasks.Push(ProcessTableDataAsync(tname, getTableNameWithLastRunTime(tname)));
}
finally
{
throttler.Release();
}
}
await Task.WhenAll(concurrentTableProcessingTasks).ConfigureAwait(false);
}
private async Task ProcessTableDataAsync(string tableName, Tuple<string, string> matchingTable)
{
var tableDataRetrieved = new TaskCompletionSource<bool>();
var metricCountsForEachDay = new ConcurrentDictionary<string, Tuple<int, int>>();
_fromATS.GetTableDataAsync<DynamicTableEntity>(tableName, GetFilter(matchingTable))
.Subscribe(entities => ProcessWizardDataChunk(metricCountsForEachDay, entities), () => tableDataRetrieved.TrySetResult(true));
await tableDataRetrieved.Task;
await SaveMetricDataAsync(tableName, metricCountsForEachDay).ConfigureAwait(false);
}
【问题讨论】:
-
您需要向我们展示您的代码。
-
另外,您应该坚持使用 Rx 或 TPL/async/await。一个或另一个,而不是两者。混用很容易造成死锁。就你个人而言,我会选择 Rx。
-
我不认为这是死锁,当我调试并在断点处停止时,其他线程正在工作并且内存压力较小,因为我已停止处理更多线程
-
不过,混合 Rx 和 TPL 是个坏主意。您使用的 Rx 代码在哪里?
-
混合 Rx 和
async很好。事实上,Rx 允许你await一个序列,并且有几个专门为await设计的运算符(例如,LastAsync)。
标签: c# asynchronous async-await system.reactive parallel.foreach