【问题标题】:Safe Way to Limit Running Threads C#限制运行线程 C# 的安全方法
【发布时间】:2015-04-22 16:05:37
【问题描述】:

我正在编写一个可以运行各种任务的程序。我已经建立了一个我称之为“任务队列”的东西,我将在其中不断地抓取下一个要处理的任务(如果有的话)并启动一个新线程来处理该任务。但是,出于明显的原因,我想限制一次可以产生的线程数量。我创建了一个变量来跟上要生成的最大线程数和一个用于当前线程数的变量。我正在考虑使用锁来尝试准确地跟上当前的线程数。这是我的总体思路。

public class Program {
  private static int mintThreadCount;
  private static int mintMaxThreadCount = 10;
  private static object mobjLock = new object();
  static void Main(string[] args) {
     mintThreadCount = 0;
     int i = 100;
     while(i > 0) {
        StartNewThread();
        i--;
     }
     Console.Read();
  }

  private static void StartNewThread() {
     lock(mobjLock) {
        if(mintThreadCount < mintMaxThreadCount) {
           Thread newThread = new Thread(StartTask);
           newThread.Start(mintThreadCount);
           mintThreadCount++;
        }
        else {
           Console.WriteLine("Max Thread Count Reached.");
        }
     }
  }

  private static void StartTask(object iCurrentThreadCount) {
     int id = new Random().Next(0, 1000000);
     Console.WriteLine("New Thread with id of: " + id.ToString() + " Started. Current Thread count: " + ((int)iCurrentThreadCount).ToString());
     Thread.Sleep(new Random().Next(0, 3000));
     lock(mobjLock) {
        Console.WriteLine("Ending thread with id of: " + id.ToString() + " now.");
        mintThreadCount--;
        Console.WriteLine("Thread space release by id of: " + id.ToString() + " . Thread count now at: " + mintThreadCount);
     }
  }
}

由于我在两个地方锁定以访问同一个变量(启动新线程时递增,结束时递减),等待锁定递减的线程是否有可能挂起并且永远不会结束?从而达到最大线程数并且永远无法启动另一个?对我的方法有其他建议吗?

【问题讨论】:

  • 我倾向于阅读 Thread Pooling,因为它可以解决您的很多问题。
  • 是的,@oleksii 说的:使用SemaphoreSlim。请参阅stackoverflow.com/a/19594643/56778 的简单示例
  • 我还应该提到,在 Main() 中它将是 while(true) 并且不限于 100。我希望它持续运行。因此主线程永远不应该被阻塞。所以我看到线程池很有用,但我不确定它是否完全符合我想要完成的任务。我开始使用 Semaphore,但我想要一种方法来确保一次只能存在一定数量的线程,而不仅仅是一次限制代码块中的线程数。这样,我就不会产生无穷无尽的线程,然后一次运行一定数量的任务,而是产生有限的线程。

标签: c# multithreading locking


【解决方案1】:

最简单的问题先... :)

...等待锁递减的线程是否有可能被挂起并且永远不会结束?

不,不在您发布的代码中。在等待计数更改或类似情况时,没有任何代码持有锁。您只需获取锁,然后修改计数或发出消息,然后立即释放锁。所以没有线程会无限期地持有锁,也没有嵌套锁(如果操作不正确可能导致死锁)。

现在,也就是说:从您发布的代码和您的问题来看,这里的 intent 是什么并不完全清楚。编写的代码确实会限制创建的线程数。但是一旦达到这个限制(它会很快完成),主循环就会旋转,报告"Max Thread Count Reached."

确实,总循环数为 100,我认为整个循环可能会在第一个线程开始运行之前完成,具体取决于系统上占用 CPU 内核的其他内容。如果某些线程确实开始运行,并且其中一些线程的睡眠时间非常短,那么您以后可能会潜入更多线程。但循环的大多数迭代都会看到线程数达到最大值,报告已达到限制并继续循环的下一次迭代。

你在 cmets 中写下(如果你认为它是相关的,你应该把它放在 question 本身)“主线程永远不应该被阻塞”。当然,还有一个问题,没有阻塞的时候主线程在做什么?主线程如何知道是否以及何时尝试安排新线程?

如果您想要一个真正有用的答案,这些都是重要的细节。

请注意,有人建议您使用信号量(具体来说,SemaphoreSlim)。这可能是个好主意,但请注意,该类通常用于协调多个线程,所有线程都竞争相同的资源。为了使其有用,您实际上需要超过 10 个线程,信号量确保在给定时间只有 10 个线程运行。

在您的情况下,在我看来,您实际上是在问如何避免首先创建额外的线程。 IE。您希望主循环检查计数,如果达到最大计数,则根本不创建线程。在这种情况下,一种可能的解决方案可能是直接使用 Monitor 类:

private static void StartNewThread() {
    lock(mobjLock) {
        while (mintThreadCount >= mintMaxThreadCount) {
           Console.WriteLine("Max Thread Count Reached.");
           Monitor.Wait(mobjLock);
        }

        Thread newThread = new Thread(StartTask);
        newThread.Start(mintThreadCount);
        mintThreadCount++;
        }
    }
}

上面会导致StartNewThread()方法等到计数低于最大值,然后总是会创建一个新线程。

当然,每个线程都需要发出信号表明它已经更新了计数,这样就可以从等待中释放上述循环并检查计数:

private readonly Random _rnd = new Random();

private static void StartTask(object iCurrentThreadCount) {
    int id = _rnd.Next(0, 1000000);
    Console.WriteLine("New Thread with id of: " + id.ToString() + " Started. Current Thread count: " + ((int)iCurrentThreadCount).ToString());
    Thread.Sleep(_rnd.Next(0, 3000));
    lock(mobjLock) {
        Console.WriteLine("Ending thread with id of: " + id.ToString() + " now.");
        mintThreadCount--;
        Console.WriteLine("Thread space release by id of: " + id.ToString() + " . Thread count now at: " + mintThreadCount);
        Monitor.Pulse(mobjLock);
    }
}

上面的问题是它阻塞主循环。如果我理解正确,你不会想要的。

(注意:您的代码中有一个常见但严重的错误,即每次需要随机数时都会创建一个新的Random 对象。要使用Random类正确,您必须只创建一个实例并在需要新的随机数时重复使用它。我已经调整了上面的代码示例来解决这个问题)。

上述问题和您的原始版本的其他问题之一是每个新任务都分配了一个全新的线程。线程的创建成本很高,甚至简单地存在,这就是线程池存在的原因。根据您的实际情况,您可能应该只使用例如ParallelParallelEnumerableTask 来管理您的任务。

但如果你真的想明确地完成这一切,一个选择是简单地启动十个线程,并让它们从BlockingCollection&lt;T&gt; 检索数据以进行操作。由于您恰好启动了 10 个线程,因此您知道您将永远不会运行超过 10 个线程。当有足够的工作让所有十个线程都忙时,它们就会忙。否则,队列将为空,部分或全部将等待新数据显示在队列中。空闲,但不使用任何 CPU 资源。

例如:

private BlockingCollection<int> _queue = new BlockingCollection<int>();

private static void StartThreads() {
    for (int i = 0; i < mintMaxThreadCount; i++) {
        new Thread(StartTask).Start();
    }
}

private static void StartTask() {
    // NOTE: a random number can't be a reliable "identification", as two or
    // more threads could theoretically get the same "id".
    int id = new Random().Next(0, 1000000);
    Console.WriteLine("New Thread with id of: " + id.ToString() + " Started.");
    foreach (int i in _queue) {
        Thread.Sleep(i);
    }
    Thread.Sleep(new Random().Next(0, 3000));
}

您只需在某个地方调用一次StartThreads(),而不是多次调用您的另一个StartNewThread() 方法。大概是在您提到的while (true) 循环之前。

然后由于需要处理一些任务,你只需将数据添加到队列中,例如:

_queue.Add(_rnd.Next(0, 3000));

当您希望线程全部退出时(例如,在您的主循环退出后,然而会发生这种情况):

_queue.CompleteAdding();

这将导致每个正在进行的foreach 循环结束,让每个线程退出。

当然,BlockingCollection&lt;T&gt;T 类型参数可以是任何值。据推测,在您的情况下,它实际上代表了一项“任务”。我使用了int,只是因为这实际上是您的示例中的“任务”(即线程应该休眠的毫秒数)。

然后你的主线程可以做它通常做的任何事情,调用Add()方法来根据需要将新的工作分派给你的消费者线程。

同样,如果没有更多细节,我无法真正评论这种方法是否比使用 .NET 中的一种内置任务运行机制更好。但考虑到您到目前为止所解释的内容,它应该可以正常工作。

【讨论】:

  • 感谢您的详细回复。 Random 只是用于测试和查看控制台中的线程(没有注意;))。报告最大计数的主线程很好。我只需要一种方法来不断寻找要处理的任务(我的公司有数千个不同的日常任务),但又不会产生无穷无尽的线程。在阅读了您的回复后,我想我的问题真的应该是:线程请求锁是排队还是有机会在完成任务后永远无法获得减少的锁?不过我确实喜欢你的方法。
  • “线程请求锁是否排队” -- 是的,Monitor 类(这是lock 语句的基础)使用等待队列和一个就绪队列。线程是 FIFO 服务的。
猜你喜欢
  • 1970-01-01
  • 2019-12-06
  • 1970-01-01
  • 1970-01-01
  • 2012-03-20
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多