【问题标题】:Task.Run loop with external parameters are changed by loop带有外部参数的Task.Run循环由循环改变
【发布时间】:2019-04-26 17:14:34
【问题描述】:

我有这段代码(简化)来处理 100 个不同并行线程上的参数数组,但是变量 x 和 y 在线程中使用时被线程内的循环更改。如果我用 1 个线程运行该函数,那么它就可以工作。

我还尝试将参数放入 ConcurrentBag 并使用 foreach 进行循环,但结果相同,参数在线程中混合。

List<Task> tasks = new List<Task>();
var listConcurentBag = new ConcurrentBag<int>();
int nThreadCount = 0;

for (x=0; x<1000; x++)
  for (y=0; y<1000; y++)
  {
     int x1=x;
     int y2=y;

     Task t = Task.Run(() =>
     {         
        int param1=x1;
        int param2=y2;

        // some calculations with param1 and param2

        listConcurentBag.Add(result);
     }); // tasks

     tasks.Add(t);
     nThreadCount++;

   if (nThreadCount == 100) // after 100 threads started, wait
   {
       nThreadCount = 0;
       Task.WaitAll(tasks.ToArray());
       tasks.Clear();
   }
 }

【问题讨论】:

  • 这是故意的,还是错误的? for (y=0; x&lt;1000; y++)
  • @TheodorZoulias 这只是 stackoverflow 中的一个错字
  • 您似乎无缘无故地在这里做了很多工作 - 任务默认使用线程池,因此即使您尝试缓冲 100 个任务,您也只能执行与线程池一样多的任务可以交付。进行这种缓冲并没有明显的执行或内存使用收益。
  • “变量 x 和 y 被线程内的循环更改,而在线程中使用” - 不在您提供的此代码中。如果您想要一个清晰直接的答案,您需要提供minimal reproducible example
  • @Enigmativity 这种方法的性能提升非常明显!从 1 个线程到 120 个线程,性能提高了大约 1000%!

标签: c# .net multithreading thread-safety task


【解决方案1】:

您应该使用 Microsoft 的响应式框架(又名 Rx)- NuGet System.Reactive 并添加 using System.Reactive.Linq; - 然后您可以这样做:

var query =
    from x in Observable.Range(0, 1000)
    from y in Observable.Range(0, 1000)
    from r in Observable.Start(() => GetResult(x,  y))
    select new { x, y, r };

IDisposable subscription =
    query
        .Buffer(100)
        .Subscribe(results =>
        {
            /* do something with each buffered list of results */
        });

现在,这与您当前的代码并不完全相同,但它会在使用线程池的最大容量后立即为您提供 100 个结果块。

您可以将其更改为像这样设置并发:

var query =
    from x in Observable.Range(0, 1000)
    from y in Observable.Range(0, 1000)
    select Observable.Start(() => new { x, y, r = GetResult(x,  y) });

IDisposable subscription =
    query
        .Merge(maxConcurrent: 100) // limit to 100 threads
        .Buffer(count: 100) // produce 100 results at a time
        .Subscribe(results =>
        {
            /* do something with the list of results */
        });

如果您想在代码自然完成之前停止代码,只需调用subscription.Dispose();

Rx 确实倾向于生成更简洁的代码,恕我直言。

【讨论】:

  • 嗨@Enigmativity!关于您的第一个代码示例的两个问题。 1)from r in Observable.Start(...from r in Task.Run(...有什么区别? 2)我注意到查询是异步的。您如何等待订阅?
  • @TheodorZoulias - Rx 中有一个过载将Task.Run 变成一个可观察的。您可以通过 Observable.Start 获得更多控制权,因为您可以设置调度程序。
  • 不知道用这种方法性能是不是变慢了?因为实施水平高
  • @TheodorZoulias - 如果你愿意,你可以这样做 var results = await query.ToArray(); .ToArray() 之所以存在,是因为在 observable 上调用 await 只会返回最后产生的值。
  • @MarioM - 测量它。使用Stopwatch 进行正确测量。
【解决方案2】:

我有一个替代实现的建议,您可能会或可能不会发现适合您的需求。而不是分批处理 100 个任务 您可以将嵌套的 for 循环表示为单个可枚举,然后将其提供给内置方法 Parallel.ForEach 以完成并行工作。

private IEnumerable<(int, int)> GetNestedFor()
{
    for (int x = 0; x < 1000; x++)
    {
        for (int y = 0; y < 1000; y++)
        {
            yield return (x, y); // return a ValueTuple<int, int>
        }
    }
}

ThreadPool.SetMinThreads(100, 100);
var options = new ParallelOptions() { MaxDegreeOfParallelism = 100 };
Parallel.ForEach(GetNestedFor(), options, item =>
{
    int param1 = item.Item1;
    int param2 = item.Item2;
    Console.WriteLine($"Processing ({param1}, {param2})");
    Thread.Sleep(100); // Simulate some work
});

输出:

处理 (0, 1)
处理 (0, 2)
处理 (0, 0)
处理 (0, 3)
...
处理中 (0, 998)
处理中 (0, 997)
正在处理 (0, 999)
处理 (1, 0)
处理 (1, 1)
...
处理 (999, 999)
处理(999、998)

【讨论】:

  • 谢谢,但是我有3个参数,我只是简化了stackoverflow的函数,除了元组还有其他方法吗?
  • 是的,您可以根据需要使用自定义类。您还可以使用具有 3 个元素的元组,如下所示:ValueTuple&lt;int, int, int&gt;
  • 谢谢,我试试这个
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多