【问题标题】:Multiple async-await chaining inside Parallel.ForEachParallel.ForEach 中的多个异步等待链接
【发布时间】:2020-10-07 03:51:09
【问题描述】:

我有一个 Parallel.ForEach 循环,它遍历一个集合。在循环内部,我进行了多次网络 I/O 调用。我使用了 Task.ContinueWith 并嵌套了随后的 async-await 调用。处理的顺序无关紧要,但来自每个异步调用的数据应该以同步的方式处理。含义 - 对于每次迭代,从第一个异步调用检索到的数据应该传递给第二个异步调用。第二个异步调用完成后,两个异步调用的数据应该一起处理。

Parallel.ForEach(someCollection, parallelOptions, async (item, state) =>
{
    Task<Country> countryTask = Task.Run(() => GetCountry(item.ID));

    //this is my first async call
    await countryTask.ContinueWith((countryData) =>
    {
        countries.Add(countryData.Result);

        Task<State> stateTask = Task.Run(() => GetState(countryData.Result.CountryID));

        //based on the data I receive in 'stateTask', I make another async call
        stateTask.ContinueWith((stateData) =>
        {
            states.Add(stateData.Result);

            // use data from both the async calls pass it to below function for some calculation
            // in a synchronized way (for a country, its corresponding state should be passed)

            myCollection.ConcurrentAddRange(SomeCalculation(countryData.Result, stateData.Result));
        });
    });
});

我在没有使用 continue await 的情况下尝试了上述方法,但它没有以同步方式工作。现在,上面的代码执行完毕,但没有记录被处理。

请问有什么帮助吗?让我知道是否可以添加更多详细信息。

【问题讨论】:

  • 在同一个代码中混合awaitContinueWith 似乎非常奇怪,并且:如果您已经在 TP 上,使用 Task.Run 很奇怪。 ..看看重构它,但是...这不仅仅是有点奇怪!
  • @MarcGravell:你可能是对的。我的意图是并行处理发生多个网络 I/O 调用的数据,但以同步方式。
  • 作为一般规则,你不应该使用ContinueWith ... 几乎永远不要再使用了
  • Parallel.ForEachis not async-friendly。传递的 lambda 是 async void。该方法将在启动异步操作后立即完成,MaxDegreeOfParallelism 选项将不被尊重......很多讨厌的问题。仅将此方法用于 CPU 密集型工作。

标签: c# asynchronous async-await task-parallel-library parallel.foreach


【解决方案1】:

由于您的方法涉及 I/O,因此它们应该被编写为真正的异步,而不仅仅是使用 Task.Run 在线程池上同步运行。

那么您可以将Task.WhenAllEnumerable.Select 结合使用:

var tasks = someCollection.Select(async item =>
{
    var country = await GetCountryAsync(item.Id);
    var state = await GetStateAsync(country.CountryID);
    var calculation = SomeCalculation(country, state);

    return (country, state, calculation);
});

foreach (var tuple in await Task.WhenAll(tasks))
{
    countries.Add(tuple.country);
    states.Add(tuple.state);
    myCollection.AddRange(tuple.calculation);
}

这将确保每个country > state > calculation 顺序发生,但每个item 都是同时异步处理的。


根据评论更新

using var semaphore = new SemaphoreSlim(2);
using var cts = new CancellationTokenSource();

int failures = 0;

var tasks = someCollection.Select(async item =>
{
    await semaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        Interlocked.Exchange(ref failures, 0);

        return (country, state, calculation);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        semaphore.Release();
    }
});

信号量保证最多2个并发异步操作,取消令牌将在连续10个异常后取消所有未完成的任务。

Interlocked 方法可确保以线程安全的方式访问 failures


进一步更新

使用 2 个信号量来防止多次迭代可能更有效。

将所有列表添加封装到一个方法中:

void AddToLists(Country country, State state, Calculation calculation)
{
    countries.Add(country);
    states.Add(state);
    myCollection.AddRange(calculation);
}

然后您可以允许 2 个线程同时处理 Http 请求,并允许 1 个线程执行添加,从而使该操作线程安全:

using var httpSemaphore = new SemaphoreSlim(2);
using var listAddSemaphore = new SemaphoreSlim(1);
using var cts = new CancellationTokenSource();

int failures = 0;

await Task.WhenAll(someCollection.Select(async item =>
{
    await httpSemaphore.WaitAsync(cts.Token);
    
    try
    {
        var country = await GetCountryAsync(item.Id);
        var state = await GetStateAsync(country.CountryID);
        var calculation = SomeCalculation(country, state);

        await listAddSemaphore.WaitAsync(cts.Token);
        AddToLists(country, state, calculation);

        Interlocked.Exchange(ref failures, 0);
    {
    catch
    {
        if (Interlocked.Increment(ref failures) >= 10)
        {
            cts.Cancel();
        }
        throw;
    }
    finally
    {
        httpSemaphore.Release();
        listAddSemaphore.Release();
    }
}));

【讨论】:

【解决方案2】:

我认为您过于复杂了;在Parallel.ForEach 中,您已经在线程池中,因此在其中创建大量附加 任务确实没有任何好处。所以;如何做到这一点实际上取决于GetState 等是同步的还是异步的。如果我们假设是同步的,那么类似:

Parallel.ForEach(someCollection, parallelOptions, (item, _) =>
{
    var country = GetCountry(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = GetState(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

如果它们是异步的,那就更尴尬了; 很好如果我们能做这样的事情:

// WARNING: DANGEROUS CODE - DO NOT COPY
Parallel.ForEach(someCollection, parallelOptions, async (item, _) =>
{
    var country = await GetCountryAsync(item.Id);

    countries.Add(country); // warning: may need to synchronize

    var state = await GetStateAsync(country.CountryID);

    states.Add(state); // warning: may need to synchronize

    // use data from both the async calls pass it to below function for some calculation
    // in a synchronized way (for a country, its corresponding state should be passed)
    myCollection.ConcurrentAddRange(SomeCalculation(country, state));
});

但这里的问题是Parallel.ForEach 中的回调都不是“可等待的”,这意味着:我们在这里默默地创建了一个async void 回调,这是:非常糟糕。这意味着一旦不完整的await 发生,Parallel.ForEach 就会认为它已经“完成”了,这意味着:

  1. 我们不知道所有工作何时真正完成
  2. 您的并发操作可能比您预期的要多(不能遵守 max-dop)

目前似乎没有任何好的 API 可以避免这种情况。

【讨论】:

  • 您在上面解释的异步方法是我的第一次尝试,其中 Parallel.ForEach 没有等待调用完成。然后我做了类似这里解释的事情:stackoverflow.com/a/26993772/5014099。问题在于两个异步调用之间的同步,其中第一次的输出被传递给第二次调用,并且在两者都完成后,两者的返回值应该是同步的。
  • @SouvikGhosh oof,确实 - Parallel.ForEach 有“动作”主体(没有 Func&lt;T&gt; 主体),所以:如果我们使用 async 版本,它将使用 async void ,这真的很糟糕GetCountryGetState 是可等待的方法吗?
  • 是的,他们正在等待
  • @SouvikGhosh 我已经编辑以阻止/解释这一点 - 我的指导不好,抱歉;我不认为这里有什么好的选择——你可以推出自己的“最大多普感知并行异步”的东西,但是......
  • @SouvikGhosh 要明确:添加更多Task(在问题中)也不能解决问题
猜你喜欢
  • 2020-01-05
  • 2023-01-27
  • 1970-01-01
  • 2021-01-24
  • 2019-11-17
  • 2018-12-31
  • 2013-02-26
  • 2014-05-13
  • 1970-01-01
相关资源
最近更新 更多