【问题标题】:How do Tasks in the Task Parallel Library affect ActivityID?任务并行库中的任务如何影响 ActivityID?
【发布时间】:2010-12-02 23:07:53
【问题描述】:

在使用任务并行库之前,我经常使用 CorrelationManager.ActivityId 来跟踪多线程的跟踪/错误报告。

ActivityId 存储在线程本地存储中,因此每个线程都有自己的副本。这个想法是,当你启动一个线程(活动)时,你会分配一个新的 ActivityId。 ActivityId 将与任何其他跟踪信息一起写入日志,从而可以挑选出单个“活动”的跟踪信息。这对 WCF 非常有用,因为 ActivityId 可以转移到服务组件。

下面是我所说的一个例子:

static void Main(string[] args)
{
    ThreadPool.QueueUserWorkItem(new WaitCallback((o) =>
    {
        DoWork();
    }));
}

static void DoWork()
{
    try
    {
        Trace.CorrelationManager.ActivityId = Guid.NewGuid();
        //The functions below contain tracing which logs the ActivityID.
        CallFunction1();
        CallFunction2();
        CallFunction3();
    }
    catch (Exception ex)
    {
        Trace.Write(Trace.CorrelationManager.ActivityId + " " + ex.ToString());
    }
}

现在,对于 TPL,我的理解是多个任务共享线程。这是否意味着 ActivityId 容易在任务中被重新初始化(被另一个任务)?是否有新的机制来处理活动跟踪?

【问题讨论】:

  • 我没有什么可以提供的,但我也对这个问题感兴趣。似乎同样的问题也适用于一般使用 CallContext.LogicalSetData 的信息集,因为这是 Trace.CorrelationManager 用来存储 ActivityId 和 LogicalOperationStack 的技术。
  • @wageohe - 我今天终于有时间测试了,已经发布了我的结果:)
  • 我在回答中发布了更多详细信息。我还在 SO 上发布了另一个答案的链接,我在 SO 上提出的一个新问题,以及我在 Microsoft 的 Parallel Extensions 论坛上提出的一个问题(但截至 2011 年 1 月 21 日尚未回答) .也许您会发现这些信息有用,也许没有。

标签: c# .net multithreading task


【解决方案1】:

我进行了一些实验,结果发现我的问题中的假设不正确 - 使用 TPL 创建的多个任务不会同时在同一个线程上运行。

ThreadLocalStorage 在 .NET 4.0 中与 TPL 一起使用是安全的,因为一个线程一次只能由一个任务使用。

任务可以同时共享线程的假设是基于我在DotNetRocks 上听到的关于 c# 5.0 的采访(抱歉,我不记得是哪个节目了) - 所以我的问题可能(或可能不会)很快变得相关。

我的实验启动了一些任务,并记录了运行了多少任务、花费了多长时间以及消耗了多少线程。如果有人想重复,代码如下。

class Program
{
    static void Main(string[] args)
    {
        int totalThreads = 100;
        TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
        Task task = null;
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();
        Task[] allTasks = new Task[totalThreads];
        for (int i = 0; i < totalThreads; i++)
        {
            task = Task.Factory.StartNew(() =>
           {
               DoLongRunningWork();
           }, taskCreationOpt);

            allTasks[i] = task;
        }

        Task.WaitAll(allTasks);
        stopwatch.Stop();

        Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
        Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
        Console.ReadKey();
    }


    private static List<int> threadIds = new List<int>();
    private static object locker = new object();
    private static void DoLongRunningWork()
    {
        lock (locker)
        {
            //Keep a record of the managed thread used.
            if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
                threadIds.Add(Thread.CurrentThread.ManagedThreadId);
        }
        Guid g1 = Guid.NewGuid();
        Trace.CorrelationManager.ActivityId = g1;
        Thread.Sleep(3000);
        Guid g2 = Trace.CorrelationManager.ActivityId;
        Debug.Assert(g1.Equals(g2));
    }
}

输出(当然这取决于机器)是:

Completed 100 tasks in 23097 milliseconds
Used 23 threads

将 taskCreationOpt 更改为 TaskCreationOptions.LongRunning 给出了不同的结果:

Completed 100 tasks in 3458 milliseconds 
Used 100 threads

【讨论】:

  • 有趣的结果。我使用基于您的代码的测试程序发现了一些关于 Trace.CorrelationManager.ActivityId 的有趣内容。按原样(或多或少)使用您的代码,并使用 Trace.CorrelationManager.StartLogicalOperation/StopLogicalOperation,我可以获得“好”的结果。也就是说,以您演示的方式使用任务并使用 StartLogicalOperation/StopLogicalOperation 将每个任务(在委托内)括起来,LogicalOperationStack 似乎总是同步的。但是,使用 Parallel.For 会产生不好的结果。我会发布我的测试作为答案,因为它有代码
  • 很好的答案;感谢您的发布。然而,一个迂腐的狡辩:List&lt;int&gt; threadIds 字段可能应该是 ConcurrentBag 或 ConcurrentDictionary 而不是未同步的通用列表(字典,例如,如果您想将 taskId 与 threadId 相关联,或者只是将其用作并发哈希表并忽略.value)。
【解决方案2】:

请原谅我将其发布为答案,因为它并不能真正回答您的问题,但是,它与您的问题有关,因为它涉及 CorrelationManager 行为和线程/任务/等。我一直在研究使用 CorrelationManager 的 LogicalOperationStack(和 StartLogicalOperation/StopLogicalOperation 方法)在多线程场景中提供额外的上下文。

我采用了您的示例并对其稍作修改,以添加使用 Parallel.For 并行执行工作的能力。另外,我使用StartLogicalOperation/StopLogicalOperation 括起来(内部)DoLongRunningWork。从概念上讲,DoLongRunningWork 每次执行时都会执行以下操作:

DoLongRunningWork
  StartLogicalOperation
  Thread.Sleep(3000)
  StopLogicalOperation

我发现,如果我将这些逻辑操作添加到您的代码中(或多或少),所有逻辑操作都保持同步(总是堆栈上的预期操作数和堆栈上的操作值总是如预期的那样)。

在我自己的一些测试中,我发现并非总是如此。逻辑操作堆栈正在“损坏”。我能想到的最好解释是,当“子”线程退出时,将 CallContext 信息“合并”到“父”线程上下文中导致“旧”子线程上下文信息(逻辑操作)为“被另一个同级子线程继承。

这个问题也可能与 Parallel.For 显然使用主线程(至少在示例代码中,如所写)作为“工作线程”之一(或任何它们应该在并行中调用的线程)有关领域)。每当执行 DoLongRunningWork 时,都会启动一个新的逻辑操作(在开始时)并停止(在结束时)(即,推入 LogicalOperationStack 并从中弹出)。如果主线程已经有一个有效的逻辑操作,并且如果 DoLongRunningWork 在主线程上执行,则启动一个新的逻辑操作,因此主线程的 LogicalOperationStack 现在有两个操作。 DoLongRunningWork 的任何后续执行(只要 DoLongRunningWork 的这种“迭代”在主线程上执行)将(显然)继承主线程的 LogicalOperationStack(它现在有两个操作,而不仅仅是一个预期的操作)。

我花了很长时间才弄清楚为什么我的示例中的 LogicalOperationStack 的行为与我修改后的示例中的行为不同。最后我看到,在我的代码中,我将整个程序括在一个逻辑操作中,而在我修改过的测试程序版本中,我没有。这意味着在我的测试程序中,每次执行我的“工作”(类似于 DoLongRunningWork),已经有一个有效的逻辑操作。在您的测试程序的修改版本中,我没有将整个程序括在逻辑操作中。

因此,当我修改您的测试程序以将整个程序括在逻辑操作中并且如果我使用 Parallel.For 时,我遇到了完全相同的问题。

使用上面的概念模型,这将成功运行:

Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation

虽然由于 LogicalOperationStack 明显不同步,这最终会断言:

StartLogicalOperation
Parallel.For
  DoLongRunningWork
    StartLogicalOperation
    Sleep(3000)
    StopLogicalOperation
StopLogicalOperation

这是我的示例程序。它与您的相似之处在于它具有操作 ActivityId 和 LogicalOperationStack 的 DoLongRunningWork 方法。我也有两种风格的 DoLongRunningWork。一种使用Tasks,一种使用Parallel.For。也可以执行每种风格,以使整个并行化操作包含在逻辑操作中或不包含在逻辑操作中。因此,总共有 4 种方式来执行并行操作。要尝试每一个,只需取消注释所需的“使用...”方法,重新编译并运行。 UseTasksUseTasks(true)UseParallelFor 应该都运行完成。 UseParallelFor(true) 将在某个时候断言,因为 LogicalOperationStack 没有预期的条目数。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace CorrelationManagerParallelTest
{
  class Program 
  {     
    static void Main(string[] args)     
    { 
      //UseParallelFor(true) will assert because LogicalOperationStack will not have expected
      //number of entries, all others will run to completion.

      UseTasks(); //Equivalent to original test program with only the parallelized
                      //operation bracketed in logical operation.
      ////UseTasks(true); //Bracket entire UseTasks method in logical operation
      ////UseParallelFor();  //Equivalent to original test program, but use Parallel.For
                             //rather than Tasks.  Bracket only the parallelized
                             //operation in logical operation.
      ////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
    }       

    private static List<int> threadIds = new List<int>();     
    private static object locker = new object();     

    private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;

    private static int mainThreadUsedInDelegate = 0;

    // baseCount is the expected number of entries in the LogicalOperationStack
    // at the time that DoLongRunningWork starts.  If the entire operation is bracketed
    // externally by Start/StopLogicalOperation, then baseCount will be 1.  Otherwise,
    // it will be 0.
    private static void DoLongRunningWork(int baseCount)     
    {
      lock (locker)
      {
        //Keep a record of the managed thread used.             
        if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
          threadIds.Add(Thread.CurrentThread.ManagedThreadId);

        if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
        {
          mainThreadUsedInDelegate++;
        }
      }         

      Guid lo1 = Guid.NewGuid();
      Trace.CorrelationManager.StartLogicalOperation(lo1);

      Guid g1 = Guid.NewGuid();         
      Trace.CorrelationManager.ActivityId = g1;

      Thread.Sleep(3000);         

      Guid g2 = Trace.CorrelationManager.ActivityId;
      Debug.Assert(g1.Equals(g2));

      //This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
      //in effect when the Parallel.For operation was started.
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
      Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));

      Trace.CorrelationManager.StopLogicalOperation();
    } 

    private static void UseTasks(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
      Task task = null;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Task[] allTasks = new Task[totalThreads];
      for (int i = 0; i < totalThreads; i++)
      {
        task = Task.Factory.StartNew(() =>
        {
          DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
        }, taskCreationOpt);
        allTasks[i] = task;
      }
      Task.WaitAll(allTasks);

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

    private static void UseParallelFor(bool encloseInLogicalOperation = false)
    {
      int totalThreads = 100;
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.Start();

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StartLogicalOperation();
      }

      Parallel.For(0, totalThreads, i =>
      {
        DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
      });

      if (encloseInLogicalOperation)
      {
        Trace.CorrelationManager.StopLogicalOperation();
      }

      stopwatch.Stop();
      Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
      Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
      Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));

      Console.ReadKey();
    }

  } 
}

LogicalOperationStack 是否可以与 Parallel.For(和/或其他线程/任务结构)一起使用或如何使用它的整个问题可能值得自己提出问题。也许我会发布一个问题。同时,我想知道您对此是否有任何想法(或者,我想知道您是否考虑过使用 LogicalOperationStack,因为 ActivityId 似乎是安全的)。

[编辑]

有关将 LogicalOperationStack 和/或 CallContext.LogicalSetData 与一些不同的 Thread/ThreadPool/Task/Parallel 构造一起使用的更多信息,请参阅我对 this question 的回答。

另请参阅我在 SO 上关于 LogicalOperationStack 和 Parallel 扩展的问题: Is CorrelationManager.LogicalOperationStack compatible with Parallel.For, Tasks, Threads, etc

最后,请在 Microsoft 的 Parallel Extensions 论坛上查看我的问题: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9

在我的测试中,如果您在主线程中启动逻辑操作,然后在委托中启动/停止逻辑操作,则使用 Parallel.For 或 Parallel.Invoke 时似乎 Trace.CorrelationManager.LogicalOperationStack 可能会损坏。在我的测试中(参见上面的两个链接中的任何一个),当 DoLongRunningWork 正在执行时,LogicalOperationStack 应该总是有 2 个条目(如果我在使用各种技术启动 DoLongRunningWork 之前在主线程中启动一个逻辑操作)。因此,“损坏”是指 LogicalOperationStack 最终将有超过 2 个条目。

据我所知,这可能是因为 Parallel.For 和 Parallel.Invoke 使用主线程作为“工作”线程之一来执行 DoLongRunningWork 操作。

使用存储在 CallContext.LogicalSetData 中的堆栈来模拟 LogicalOperationStack 的行为(类似于通过 CallContext.SetData 存储的 log4net 的 LogicalThreadContext.Stacks)会产生更糟糕的结果。如果我使用这样的堆栈来维护上下文,那么在我在主线程中有“逻辑操作”并且在每次迭代中都有逻辑操作的几乎所有场景中,它都会损坏(即没有预期的条目数) /执行 DoLongRunningWork 委托。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-23
    • 1970-01-01
    • 2015-04-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多