【问题标题】:asynchronous only when there are less data仅当数据较少时才异步
【发布时间】:2014-03-03 11:27:20
【问题描述】:

我正在读取 azure 表的数据 - 大约 5k 个表并收集不同的指标并将它们保存回其他一些 azure 表,一切都以异步方式进行。我面临的问题是当偶尔发生大量数据时,应用程序开始挂起。相同的代码在数据较少的情况下也能正常工作。我正在做的步骤(所有这些都是异步使用 Rx、async 和 await)是

  1. 从 Azure 中读取所有表名
  2. 读取所有表格之前的指标数据(1 和 2 是并行的 - Task.WhenAll
  3. 从每个表中获取数据,处理并保存 (Task.WhenAll)

我想要的是,使用异步直到它不会使我的应用程序挂起。如果数据多于可以处理的数据,则不应再读取任何表的数据,而是专注于完成可用的数据处理。

Parallel.ForEach 会解决这个问题吗?

代码:已根据 Stephen Cleary 进行编辑,但仍不适用于所有表格。而它适用于 500 个表,

我认为是数据量使应用程序(控制台应用程序)停止运行,而不是线程数。 (一个线程最终可能会检索数百万行,以千为单位,每千行传递给一个方法,并将其计数添加到字典中,因此可以在需要更多内存时进行垃圾收集)还是我实现的方式@ 987654325@错了吗?

public async Task CalculateMetricsForAllTablesAsync()
{
    var allWizardTableNamesTask = GetAllWizardTableNamesAsync();
    var allTablesNamesWithLastRunTimeTask = GetAllTableNamesWithLastRunTimeAsync();

    await Task.WhenAll(allWizardTableNamesTask, allTablesNamesWithLastRunTimeTask).ConfigureAwait(false);

    var allWizardTableNames = allWizardTableNamesTask.Result;
    var allTablesNamesWithLastRunTime = allTablesNamesWithLastRunTimeTask.Result;

    var throttler = new SemaphoreSlim(10);
    var concurrentTableProcessingTasks = new ConcurrentStack<Task>();

    foreach (var tname in allWizardTableNames)
    {
        await throttler.WaitAsync();
        try
        {
           concurrentTableProcessingTasks.Push(ProcessTableDataAsync(tname, getTableNameWithLastRunTime(tname)));
        }
        finally
        {
            throttler.Release();
         }
     }

     await Task.WhenAll(concurrentTableProcessingTasks).ConfigureAwait(false);

}

private async Task ProcessTableDataAsync(string tableName, Tuple<string, string> matchingTable)
{
    var tableDataRetrieved = new TaskCompletionSource<bool>();
    var metricCountsForEachDay = new ConcurrentDictionary<string, Tuple<int, int>>();

    _fromATS.GetTableDataAsync<DynamicTableEntity>(tableName, GetFilter(matchingTable))
        .Subscribe(entities => ProcessWizardDataChunk(metricCountsForEachDay, entities), () => tableDataRetrieved.TrySetResult(true));

    await tableDataRetrieved.Task;
    await SaveMetricDataAsync(tableName, metricCountsForEachDay).ConfigureAwait(false);
}

【问题讨论】:

  • 您需要向我们展示您的代码。
  • 另外,您应该坚持使用 Rx 或 TPL/async/await。一个或另一个,而不是两者。混用很容易造成死锁。就你个人而言,我会选择 Rx。
  • 我不认为这是死锁,当我调试并在断点处停止时,其他线程正在工作并且内存压力较小,因为我已停止处理更多线程
  • 不过,混合 Rx 和 TPL 是个坏主意。您使用的 Rx 代码在哪里?
  • 混合 Rx 和 async 很好。事实上,Rx 允许你 await 一个序列,并且有几个专门为 await 设计的运算符(例如,LastAsync)。

标签: c# asynchronous async-await system.reactive parallel.foreach


【解决方案1】:

由于您的 async 正在包装 Rx,我建议在 async 级别进行限制。您可以通过定义 SemaphoreSlim 并将您的方法逻辑包装在 WaitAsync/Release 中来做到这一点。

或者,考虑 TPL 数据流。 Dataflow 具有用于限制 (MaxDegreeOfParallelism) 的内置选项,并且还可以自然地与 async 和 Rx 进行互操作。

【讨论】:

  • 控制线程数能解决数据量问题吗?即使很少的线程也可以从多个表中获取更多数据,对吧?
  • 限制将限制正在处理的任务数量。您更新的代码没有正确应用限制(它只是限制ProcessTableDataAsyncstart 并将任务添加到堆栈中)。相反,您应该将ProcessTableDataAsync 中的所有代码都包含在try 中。
  • 无法正确使用Semaphoreslim 进行节流。和TPL Dataflow一起去了。用TPL Dataflow 编写异步代码变得很容易。由于桌子数量有限,我看到了明显的性能改进。将对所有表进行测试。
猜你喜欢
  • 2021-12-31
  • 1970-01-01
  • 2017-03-23
  • 1970-01-01
  • 2014-11-01
  • 1970-01-01
  • 2014-07-25
  • 2019-07-02
  • 1970-01-01
相关资源
最近更新 更多