【问题标题】:How to properly run heavy calculations in Parallel using C#?如何使用 C# 正确地并行运行大量计算?
【发布时间】:2020-12-09 22:53:02
【问题描述】:

目标

目标是计算一定数量正方形的所有可能的多边形形状。由于对于大量计算来说这是非常繁重的计算,我想利用我的计算机拥有的多个内核。

问题

我通过创建以下场景使问题更易于解释和测试:

1) for each value of 2, 3, 5, and 7:
2) find all multiples (up to a certain value) and add them to the same List
3) remove all duplicates from said list

在我的最后一个程序中,第 2 步更加庞大且计算量更大,因此我更愿意根据第 1 步的值将任务 2 拆分为我想检查的许多值。

我尝试了什么

我用 C# Core 制作了一个带有 5 个按钮的 winforms 应用程序,尝试了我在 Stackoverflow 和 Internet 上的其他地方找到的不同的并行性变体:

这是代码(看起来很多,但它只是同一想法的 5 种变体),它们都给出了一个计数来检查它们是否产生了相同的结果以及花费了多长时间:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Security.Permissions;
using System.Text;
using System.Threading.Tasks;
using System.Windows.Forms;

namespace Parallelism
{
    public partial class Form1 : Form
    {
        private readonly int Repeat = 10000000; 

        public Form1()
        {
            InitializeComponent();
        }

        private void button1_Click(object sender, EventArgs e)
        {
            var watch = System.Diagnostics.Stopwatch.StartNew();
            List<int> output = new List<int>();
            foreach (int x in new int[] { 2, 3, 5, 7 })
            {
                for (int i = 0; i < Repeat; i++)
                {
                    output.Add(x * i);
                }
            }
            output = output.Distinct().ToList();
            watch.Stop();
            (sender as Button).Text += $", c:{output.Count} - {watch.ElapsedMilliseconds}ms";
        }

        private void button2_Click(object sender, EventArgs e)
        {
            var watch = System.Diagnostics.Stopwatch.StartNew();
            ConcurrentBag<int> output = new ConcurrentBag<int>();
            Task task = Task.WhenAll(
              Task.Run(() => button2_Calculation(2, output)),
              Task.Run(() => button2_Calculation(3, output)),
              Task.Run(() => button2_Calculation(5, output)),
              Task.Run(() => button2_Calculation(7, output))
            );
            task.Wait();
            HashSet<int> output2 = new HashSet<int>(output);
            watch.Stop();
            (sender as Button).Text += $", c:{output2.Count} - {watch.ElapsedMilliseconds}ms";
        }
        private void button2_Calculation(int x, ConcurrentBag<int> output)
        {
            for (int i = 0; i < Repeat; i++)
            {
                output.Add(x * i);
            }
        }

        private void button3_Click(object sender, EventArgs e)
        {
            var watch = System.Diagnostics.Stopwatch.StartNew();
            List<int> output = new List<int>();
            foreach (int x in (new int[] { 2, 3, 5, 7 }).AsParallel())
            {
                for (int i = 0; i < Repeat; i++)
                {
                    output.Add(x * i);
                }
            }
            output = output.Distinct().ToList();
            watch.Stop();
            (sender as Button).Text += $", c:{output.Count} - {watch.ElapsedMilliseconds}ms";
        }

        private void button4_Click(object sender, EventArgs e)
        {
            var watch = System.Diagnostics.Stopwatch.StartNew();
            ConcurrentBag<int> output = new ConcurrentBag<int>();
            Dictionary<int, Task> runningTasks = new Dictionary<int, Task>();
            foreach (int x in new int[] { 2, 3, 5, 7 })
            {
                int value = x;
                runningTasks.Add(x, Task.Factory.StartNew(() => button2_Calculation(value, output)));
            }
            foreach (Task t in runningTasks.Select(c => c.Value))
                t.Wait();
            HashSet<int> output2 = new HashSet<int>(output);
            watch.Stop();
            (sender as Button).Text += $", c:{output2.Count} - {watch.ElapsedMilliseconds}ms";
        }

        private void button5_Click(object sender, EventArgs e)
        {
            var watch = System.Diagnostics.Stopwatch.StartNew();
            ConcurrentBag<int> output = new ConcurrentBag<int>();
            Parallel.ForEach(new int[] { 2, 3, 5, 7 }, x => button5_Calculation(x, output));
            HashSet<int> output2 = new HashSet<int>(output);
            watch.Stop();
            (sender as Button).Text += $", c:{output2.Count} - {watch.ElapsedMilliseconds}ms";
        }
        private void button5_Calculation(int x, ConcurrentBag<int> output)
        {
            for (int i = 0; i < Repeat; i++)
                output.Add(x * i);
        }
    }
}

目前的结果

到目前为止,上述所有方法都产生了 1 秒到 1.5 秒之间的相似持续时间。 实际上,有时正常的串行执行似乎要快得多。 这怎么可能?我希望使用 8 个内核(16 个虚拟内核)拆分任务会导致更快的整体速度?

非常感谢任何帮助!

未来

在了解有关如何正确实现并行性的更多信息后,我希望还在另一个线程/异步上运行整个计算,以使 GUI 保持响应。

编辑:

回复@Pac0: 这是我对您的建议的实施。它似乎没有太大区别:

private void button6_Click(object sender, EventArgs e)
        {
            var watch = System.Diagnostics.Stopwatch.StartNew();
            ConcurrentBag<HashSet<int>> bag = new ConcurrentBag<HashSet<int>>();
            var output = Parallel.ForEach(new int[] { 2, 3, 5, 7 }, x =>
            {
                HashSet<int> temp = new HashSet<int>();
                for (int i = 0; i < Repeat; i++)
                    temp.Add(x * i);
                bag.Add(temp);
            });
            HashSet<int> output2 = new HashSet<int>();
            foreach (var hash in bag)
                output2.UnionWith(hash);
            watch.Stop();
            (sender as Button).Text += $", c:{output2.Count} - {watch.ElapsedMilliseconds}ms";
        }

【问题讨论】:

  • 你可以对Repeat = 100000000尝试相同的方法并查看结果吗?
  • 我的猜测是,由于您总是使用与并发包相同的方法来“即时”存储结果,因此会导致大量锁定,从而失去使用多线程的好处。但我可能弄错了,因为我不使用并发包进行大存储。我会更多地采用“Map / Reduce”方法:独立计算 2、3、5 和 7,然后“合并”所有结果。
  • 所以基本上,为每个并行计算创建一个HashSet,然后在所有完成后,对哈希集进行联合(这将处理重复)。
  • 您似乎在假设瓶颈是for 循环:for (int i = 0; i &lt; Repeat; i++) 的情况下进行操作。您如何确认问题存在,而不是使用删除重复项的代码?
  • @JoshuaRobinson,你是对的。在此示例中,最大的瓶颈是删除重复项。然而,这在我的代码的实际实现中有所不同。但我会接受有关提高重复删除效率的建议。

标签: c# visual-studio winforms asynchronous parallel-processing


【解决方案1】:

正如评论所述,您使用单个集合会导致严重锁定。在计算上,基于任务的解决方案大约快 50%(见下文,我们不管理组合输出)。它管理导致某些绑定的集合。根据其处理方式,它可能比串行执行慢 3 倍以上。

与并发的斗争总是平衡负载到瓶颈。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace ConsoleApp5
{
    class Program
    {
        static int Repeat = 100000000;
        static int[] worklist = new int[] { 2, 3, 5, 7 };

        static void Main(string[] args)
        {
            var watch = System.Diagnostics.Stopwatch.StartNew();

            Console.WriteLine("Hello World! Launching Threads");
            Task launcher = Task.Run(()=>LaunchThreads());
            launcher.Wait();
            Console.WriteLine("Hello World! Threads Complete");

            watch.Stop();
            Console.WriteLine($"Threads took: {watch.ElapsedMilliseconds}");

            watch = System.Diagnostics.Stopwatch.StartNew();
            Console.WriteLine("Serial Execution Starting");
            foreach (int i in worklist)
            {
                DoWork(i);
            }
            watch.Stop();
            Console.WriteLine($"Serial Execution took: {watch.ElapsedMilliseconds}");
        }
        static async void LaunchThreads()
        {
            //Dictionary<int, List<int>> mywork = new Dictionary<int, List<int>>();
            HashSet<int> output = new HashSet<int>();

            var worktasks = new List<Task<List<int>>>();

            foreach (int i in worklist)
            {
                worktasks.Add(Task.Run(() => DoWork(i)));
            }

            await Task.WhenAll(worktasks);
        }
        static List<int> DoWork(int x)
        {
            Console.WriteLine($"Thread Worker: {x}");
            List<int> output = new List<int>();
            for (int i = 0; i < Repeat; i++)
            {
                output.Add(x * i);
            }

            Console.WriteLine($"Thread Worker: {x} - Exiting");
            return output;
        }
    }
}

【讨论】:

  • 我应该添加我尝试使用基于 IProgress 的反馈将项目直接添加到哈希集中,但这是最糟糕的。在哈希集上迭代结果和 UnionWith 似乎是最好的,但仍然很慢。
【解决方案2】:

我想将其作为遮阳篷发布,因为一个名叫 Yugami 的人发布了与我尝试的不同的内容,这是一个有用且良好的回复,但它被删除了。

所以我发布了我在我的测试台上重新创建他们的代码的努力:

private async void button9_Click(object sender, EventArgs e)
        {
            var watch = System.Diagnostics.Stopwatch.StartNew();
            HashSet<int> output = new HashSet<int>();
            var worktasks = new List<Task<List<int>>>();
            foreach (int i in new int[] { 2, 3, 5, 7 })
                worktasks.Add(Task.Run(() => button9_Calculation(i)));

            await Task.WhenAll(worktasks);
            foreach (Task<List<int>> tsk in worktasks)
                foreach (int i in tsk.Result)
                    output.Add(i);
            watch.Stop();
            (sender as Button).Text += $", c:{output.Count} - {watch.ElapsedMilliseconds}ms";
        }
        private List<int> button9_Calculation(int x)
        {
            List<int> output = new List<int>();
            for (int i = 0; i < Repeat; i++)
                output.Add(x * i);

            return output;
        }

以下是连续和最佳两种解决方案的结果,尝试了 100.000.000 次。 在这里,我终于看到并行执行第 2 步的一些改进,但现在最大的瓶颈是删除重复项/将其全部过滤为单个 HashSet ...

所以我认为这解决了我必须改进第 2 步的最初问题。 现在我将继续搜索以改进第 3 步;删除重复项。

【讨论】:

  • 对不起,我的解决方案不完整,但事后才注意到。我已经清理了它,但仍然存在问题,具体取决于您要如何处理数据。
  • 请注意,在button9_Calculation 方法中,您知道output 将包含Repeat 项,因此您可以使用List&lt;int&gt; output = new List&lt;int&gt;(Repeat) 构造它。通过消除在方法执行期间调整列表大小的需要,它的性能应该会更好。
猜你喜欢
  • 1970-01-01
  • 2012-10-08
  • 1970-01-01
  • 1970-01-01
  • 2018-02-26
  • 2015-08-27
  • 1970-01-01
  • 2019-10-05
  • 1970-01-01
相关资源
最近更新 更多