【问题标题】:Parallel.Foreach() yields no resultParallel.Foreach() 没有结果
【发布时间】:2018-11-16 06:08:49
【问题描述】:

我正在尝试使用 Parallel.Foreach() 并行查询 mongo-db,但我没有得到任何结果。但是当我尝试在常规 foreach 循环中运行相同的东西时,我能够执行预期的任务。

var exceptions = new ConcurrentQueue<Exception>();
var secondaryObjectsDictionaryCollection = new Dictionary<string, List<JObject>>();

// This works
foreach(var info in infos)
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
}

//This does not
Parallel.ForEach(infos, async info =>
{
    try
    {
        name = await commonValidator.ValidateAsync(name);
        await commonValidator.ValidateIdAsync(name, id);
        var list = await helper.ListRelatedObjectsAsync(name, id, info, false);

        secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
    }
    catch (Exception ex)
    {
        exceptions.Enqueue(ex);
    }
});

我只想并行执行此任务,因为涉及不同的 mongodb 集合,并且还可以减少响应时间。

我无法弄清楚我的并行循环中出了什么问题。 并行执行这些任务的任何其他方法也将起作用。

【问题讨论】:

  • 也许删除所有catch 语句会显示异常并帮助您解决?
  • @UweKeim 这样做给了我一些来自 mongodb 的预期异常,一段时间后应用程序进入中断模式。

标签: c# asynchronous async-await parallel.foreach


【解决方案1】:

让我们看一个更简单的例子来说明同样的问题

你有类似的代码

var results = new Dictionary<int, int>();

Parallel.ForEach(Enumerable.Range(0, 5), async index =>
{
  var result = await DoAsyncJob(index);
  results.TryAdd(index, result);
});

你的代码没有运行,因为表达式

async index => {...}

返回未等待

的任务

喜欢这个

Parallel.ForEach(Enumerable.Range(0, 5), index => new Task());

顺便说一句,当您像示例中那样使用多线程时,当您进行并行更新以避免错误和死锁时,您应该使用 ConcurrentDictionary 而不是 Dictionary p>

这里最好的解决方案是不要使用并行循环,而是使用 Task.WhenAll

var tasks = Enumerable.Range(0, 5).Select(async index =>
{
  var result = await DoAsyncJob(index);
  results.TryAdd(index, result);
});

await Task.WhenAll(tasks);

【讨论】:

    【解决方案2】:

    Parallel.ForEach 与传入 async 方法不兼容。如果你想要类似于 Parallel.ForEach 的东西,你可以使用 Dataflow 它是 ActionBlock。

    var workerBlock = new ActionBlock<Info>(async info => 
    {
        try
        {
            name = await commonValidator.ValidateAsync(name);
            await commonValidator.ValidateIdAsync(name, id);
            var list = await helper.ListRelatedObjectsAsync(name, id, info, false);
    
            //Note this is not thread safe and you need to put a lock around it.
            lock (secondaryObjectsDictionaryCollection) 
            {
                secondaryObjectsDictionaryCollection.Add(info.PrimaryId, secondaryObjectsList.ToList());
            }
        }
        catch (Exception ex)
        {
            exceptions.Enqueue(ex);
        }
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    foreach(var info in infos)
    {
        workerBlock.Post(info);
    }
    workerBlock.Complete();
    

    【讨论】:

    • 您能否解释一下为什么“Parallel.ForEach 与传入异步方法不兼容”。 ,请
    • @Z.R.T.当然,第二个参数的函数是Action&lt;T&gt;,但是要使异步正常工作,它需要是Func&lt;T, Task&gt;,因为它无法判断异步作业何时完成(基本上你正在传递 foreach 一个“异步无效”函数)。因此,一旦第一次等待被击中,它就认为该线程的工作已经完成。数据流有一个重载,它接受 Func&lt;T,Task&gt;
    • @Z.R.T. .NET 6 现在添加了 Parallel.ForEachAsync() 参见 docs.microsoft.com/en-us/dotnet/api/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-04-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多