【问题标题】:Missing log statements in parallel foreach loop并行 foreach 循环中缺少日志语句
【发布时间】:2025-12-27 07:00:12
【问题描述】:

我正在使用并行 foreach/for 循环,特别是我需要使用嵌套的并行 foreach/for 循环。当我尝试打印我的集合中的值时,有时控制台语句没有被打印,这是不一致的。请参阅下面的代码。

Parallel.For(0, RunModuleConfigVariables.Count, new ParallelOptions { MaxDegreeOfParallelism = 3 }, index => {
                string log = null;
                int count = 0;
                log += "Module Name " + RunModuleConfigVariables.Keys.ElementAt(index) + " thread: " + Thread.CurrentThread.ManagedThreadId + "\n";
                Parallel.ForEach(RunModuleConfigVariables[RunModuleConfigVariables.Keys.ElementAt(index)], new ParallelOptions { MaxDegreeOfParallelism = 10 }, eachendpoint => {

                    log += "\t" + count + " Endpoint Name " + eachendpoint + "\n";
                    count++;
                });
                Console.WriteLine(log);
            });

收藏:

集合类型是ConcurrentDictionary()

RunModuleConfigVariables:
        {
      "Module_1": [
        "Module_1_Endpoint_1",
        "Module_1_Endpoint_2",
        "Module_1_Endpoint_3",
        "Module_1_Endpoint_4",
        "Module_1_Endpoint_5",
        "Module_1_Endpoint_6",
        "Module_1_Endpoint_7",
        "Module_1_Endpoint_8",
        "Module_1_Endpoint_9",
        "Module_1_Endpoint_10",
        "Module_1_Endpoint_11",
        "Module_1_Endpoint_12",
        "Module_1_Endpoint_13",
        "Module_1_Endpoint_14",
        "Module_1_Endpoint_15",
        "Module_1_Endpoint_16",
        "Module_1_Endpoint_17",
        "Module_1_Endpoint_18",
        "Module_1_Endpoint_19"
      ],
      "Module_2": [
        "Module_2_Endpoint_1",
        "Module_2_Endpoint_2",
        "Module_2_Endpoint_3"
      ],
      "Module_3": [
        "Module_3_Endpoint_1"
      ]
    }

实际输出:

Module Name Module_1 thread: 4
        0 Endpoint Name Module_1_Endpoint_2
        1 Endpoint Name Module_1_Endpoint_1
        2 Endpoint Name Module_1_Endpoint_4
        3 Endpoint Name Module_1_Endpoint_5
        4 Endpoint Name Module_1_Endpoint_6
        5 Endpoint Name Module_1_Endpoint_7
        6 Endpoint Name Module_1_Endpoint_8
        18 Endpoint Name Module_1_Endpoint_9

Module Name Module_3 thread: 5
        0 Endpoint Name Module_3_Endpoint_1

Module Name Module_2 thread: 1
        0 Endpoint Name Module_2_Endpoint_2
        1 Endpoint Name Module_2_Endpoint_3
        2 Endpoint Name Module_2_Endpoint_1

预期输出:(不需要相同的顺序)

Module Name Module_1 thread: 5
        0 Endpoint Name Module_1_Endpoint_2
        1 Endpoint Name Module_1_Endpoint_3
        2 Endpoint Name Module_1_Endpoint_4
        3 Endpoint Name Module_1_Endpoint_5
        4 Endpoint Name Module_1_Endpoint_6
        5 Endpoint Name Module_1_Endpoint_7
        6 Endpoint Name Module_1_Endpoint_8
        7 Endpoint Name Module_1_Endpoint_9
        8 Endpoint Name Module_1_Endpoint_10
        9 Endpoint Name Module_1_Endpoint_11
        10 Endpoint Name Module_1_Endpoint_12
        11 Endpoint Name Module_1_Endpoint_13
        12 Endpoint Name Module_1_Endpoint_14
        13 Endpoint Name Module_1_Endpoint_15
        14 Endpoint Name Module_1_Endpoint_16
        15 Endpoint Name Module_1_Endpoint_17
        16 Endpoint Name Module_1_Endpoint_18
        17 Endpoint Name Module_1_Endpoint_19
        18 Endpoint Name Module_1_Endpoint_1

Module Name Module_2 thread: 4
        0 Endpoint Name Module_2_Endpoint_2
        1 Endpoint Name Module_2_Endpoint_3
        2 Endpoint Name Module_2_Endpoint_1

Module Name Module_3 thread: 1
        0 Endpoint Name Module_3_Endpoint_1

注意:输出不一致。有时能够看到所有子孩子,有时则不能。我如何理解这一点,以及如何克服这一点?

【问题讨论】:

    标签: .net c#-4.0 parallel.foreach parallel.for


    【解决方案1】:

    我怎么理解这个?

    并行处理意味着多个线程同时做事。这会导致你必须小心的各种奇怪的事情。

    考虑这条线:

    count++;
    

    这条C#指令实际上代表了多个操作:

    1. count变量中的值从内存加载到处理器中。
    2. 1 添加到加载到处理器中的值的值。
    3. 将新值存储到count 变量的内存位置。

    现在想象两个线程同时执行这三个指令。在完成第 3 步之前,它们都有可能完成第 1 步。这意味着如果 count 从零开始,则两个线程现在都将 count 设置为 1,这不是您想要的。

    这一行在读取log 的点和写入的点之间有更多的步骤:

    log += "\t" + count + " Endpoint Name " + eachendpoint + "\n";
    

    因此,您会发现一个线程覆盖(而不是添加)另一个线程已经写入的值要频繁得多。这就是你注意到的行为。

    ...告诉我,可以做些什么来克服这个问题。

    首先,尽可能避免并行处理。

    如果使用简单的foreach 循环,事情进展得足够快,请不要尝试优化它们。

    如果简单的foreach 循环不够快,请找出原因。大多数时候,这是因为 I/O 操作(磁盘或网络访问)。在这些情况下,使用异步任务的并发执行而不是多线程。请参阅https://*.com/a/14130314/120955What is the difference between asynchronous programming and multithreading?

    如果您正在执行需要 CPU 能力的操作,并且您确实需要它们并行运行以从中挤出额外的性能,请尽量避免更改每个操作的状态(例如,为共享变量设置值,比如count++)。一个很好的策略是命令/查询分离,您可以在不可变数据结构上进行并行处理以产生“答案”,然后使用这些答案进行必须在同一个线程上进行的所有更改。以下是您的代码中的样子:

    var logs = RunModuleConfigVariables
        .AsParallel()
        .WithDegreeOfParallelism(3)
        .Select(e =>
            "Module Name " + e.Key + " thread: " + Thread.CurrentThread.ManagedThreadId + "\n"
                + string.Join("\n",
                    e.Value
                        .AsParallel()
                        .WithDegreeOfParallelism(10)
                        .Select((eachendpoint, index) => "\t" + index + " Endpoint Name " + eachendpoint)
    
        ));
    
    Console.WriteLine(string.Join("\n", logs));
    
    

    最后,如果您绝对必须并行更改状态,您需要花时间了解锁、互斥锁、并发集合、atomic operations 和其他类似工具,并确保您只使用线程安全并行上下文中的方法,以确保您做的“正确”。

    这可能会导致这样的事情:

    Parallel.ForEach(RunModuleConfigVariables, new ParallelOptions { MaxDegreeOfParallelism = 3 }, pair =>
    {
        Console.WriteLine("Module Name " + pair.Key + " thread: " + Thread.CurrentThread.ManagedThreadId);
        var count = 0;
        Parallel.ForEach(pair.Value, new ParallelOptions { MaxDegreeOfParallelism = 10 }, eachendpoint =>
        {
            var thisCount = Interlocked.Increment(ref count);
            Console.WriteLine("\t" + thisCount + " Endpoint Name " + eachendpoint + "\n");
        });
    });
    

    【讨论】:

    • 很好的解释。我有一组需要在内部并行循环中实现的指令。每次迭代都是一个独立的任务,所以我认为异步多线程会起作用。 “count”变量只是为了验证是否显示了所有项目(类似于易于验证的序列号)。我试图删除该变量并再次执行,但没有运气。添加日志变量以获取控制台日志语句端点明智的原因(内部循环),因为简单的控制台语句会在每个端点中造成步骤顺序混乱。
    • @KrishnaBarri:如果您正在处理异步任务,我建议不要进行多线程。相反,收集任务的集合并使用await Task.WhenAll() 来获取它们的结果。这将避免线程安全问题,同时仍可能为您提供所需的性能提升。
    • 我的要求是,我在每个模块中都有一组端点,我需要按模块执行,各个端点并行。在这种情况下,我的模块和端点都应该并行执行,这将减少我的执行时间(总共有超过 25k 的案例)。我想,并行运行 for 循环会满足我的期望,有没有更好的方法可以实现? (还需要复制端点级控制台日志)
    • @KrishnaBarri:当你说“端点”时,我想到的是异步调用:这就是你在说的吗?所以大部分时间都花在等待那些异步调用完成上?在这种情况下,遵循我上面之前评论中的建议可能会在仅使用一个线程的情况下为您提供同等(可能更大)的性能提升。 (请参阅我的段落和我的回答中有关并发异步与并行/多线程的链接。)Here's a code sample 展示了该模式,让您了解我在说什么。
    【解决方案2】:

    问题是您的变量log 被多个线程分配。在尝试写入之前,您需要 lock 它。

    Parallel.For(0, RunModuleConfigVariables.Count, new ParallelOptions { MaxDegreeOfParallelism = 3 }, index => {
                    string log = null;
                    int count = 0;
                    log += "Module Name " + RunModuleConfigVariables.Keys.ElementAt(index) + " thread: " + Thread.CurrentThread.ManagedThreadId + "\n";
                    object locker = new object();
                    Parallel.ForEach(RunModuleConfigVariables[RunModuleConfigVariables.Keys.ElementAt(index)], new ParallelOptions { MaxDegreeOfParallelism = 10 }, eachendpoint => {
                        lock(locker)
                            log += "\t" + (count++) + " Endpoint Name " + eachendpoint + "\n";
                    });
                    Console.WriteLine(log);
                });
    

    【讨论】:

    • 您对问题的评估是正确的,但我不知道是否建议在附加到字符串时锁定并行进程。如何使用 PLINQ 查询从每个并行任务返回日志消息,然后在记录之前加入所有这些消息?还是使用可以处理多线程写入的真实日志框架?
    • Console.WriteLine 是完全线程安全的,将锁的范围限制为特定指令是可以接受的做法。您可以返回一个值,但这真的会以任何可衡量的方式改变速度吗?这样做会添加更多指令,而不仅仅是锁定重新分配变量值所需的时间。
    • 我应该补充一点,在这个例子中它可能会有所不同,因为唯一的指令是追加和重新分配一个变量,然后增加并重新分配一个变量。但在实践中,并行运行对于快速锁定可以忽略不计的大型操作更有意义。
    • Console.WriteLine 是线程安全的。我想知道为什么不把它移到 ForEach 中而不是建立字符串?
    • 唯一的原因是输出会从所需的输出中出现乱码,但是是的,这将是更好的方法,只需在输出前加上它所属的线程堆栈即可。