【问题标题】:How to reuse threads in .NET 3.5如何在 .NET 3.5 中重用线程
【发布时间】:2011-08-15 04:20:11
【问题描述】:

我有一个处理大块信息的子程序。为了利用整个 CPU,它将工作分成单独的线程。在所有线程都完成后,它就完成了。我读到创建和销毁线程会占用大量开销,因此我尝试使用线程池,但实际上运行速度比创建自己的线程慢。如何在程序运行时创建自己的线程然后继续重用它们?看到有人说做不到,但是线程池做到了,应该是可以的吧?

这是启动新线程/使用线程池的部分代码:

//initialization for threads
Thread[] AltThread = null;
if (NumThreads > 1)
    AltThread = new Thread[pub.NumThreads - 1];

do
{
    if (NumThreads > 1)
    {   //split the matrix up into NumThreads number of even-sized blocks and execute on separate threads
        int ThreadWidth = DataWidth / NumThreads;
        if (UseThreadPool) //use threadpool threads
        {
            for (int i = 0; i < NumThreads - 1; i++)
            {
                ThreadPool.QueueUserWorkItem(ComputePartialDataOnThread, 
                    new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) });
            }
            //get number of threads available after queue
            System.Threading.Thread.Sleep(0);
            int StartThreads, empty, EndThreads;
            ThreadPool.GetAvailableThreads(out StartThreads, out empty);
            ComputePartialData(ThisEngine, 0, ThreadWidth);

            //wait for all threads to finish
            do
            {
                ThreadPool.GetAvailableThreads(out EndThreads, out empty);
                System.Threading.Thread.Sleep(1);
            } while (StartThreads - EndThreads > 0);
        }
        else //create new threads each time (can we reuse these?)
        {
            for (int i = 0; i < NumThreads - 1; i++)
            {
                AltThread[i] = new Thread(ComputePartialDataOnThread);
                AltThread[i].Start(new object[] { AltEngine[i], ThreadWidth * (i + 1), ThreadWidth * (i + 2) });
            }
            ComputePartialData(ThisEngine, 0, ThreadWidth);

            //wait for all threads to finish
            foreach (Thread t in AltThread)
                t.Join(1000);
            foreach (Thread t in AltThread)
                if (t.IsAlive) t.Abort();
        }
    }
}

ComputePartialDataOnThread 只是将信息解包并调用 ComputePartialData。将要处理的数据在线程之间共享(它们不会尝试读取/写入相同的位置)。 AltEngine[] 是每个线程的独立计算引擎。

该操作使用线程池运行大约 10-20%。

【问题讨论】:

  • 您能否发布您的代码,以便我们了解您在做什么?有可能你的线程池出了问题,导致它变得如此缓慢。
  • 也许它只是在您的测试运行中很慢,即您达到了初始线程数,因此它必须创建更多线程以满足您的需求。在运行任何测试之前尝试手动设置池中的最小线程数。
  • 线程数与处理器内核数相匹配。在这种情况下,它只有 2 个。
  • 请不要使用Thread.Abort。这是您可以调用的最糟糕的功能。如果您的线程正在做一些关键的事情怎么办?砰,你死了。
  • 做到这一点很棘手。但是,使用 ThreadPool 通常是并行化计算绑定操作的最佳方式。我强烈推荐阅读 Jeffrey Richter 的书“通过 C# 进行 CLR”(第五部分“线程”),详细解释如何以及何时使用多线程。

标签: c# .net multithreading threadpool


【解决方案1】:

这听起来像是一个相当普遍的需求,可以通过多线程生产者-消费者队列来解决。线程保持“活动”,并在将新工作添加到队列时发出工作信号。工作由委托(在您的情况下为 ComputePartialDataOnThread)表示,传递给委托的数据是排队的数据(在您的情况下为 ComputePartialDataOnThread 的参数)。有用的特性是管理工作线程和实际算法的实现是分开的。这是 p-c 队列:

public class SuperQueue<T> : IDisposable where T : class
{
    readonly object _locker = new object();
    readonly List<Thread> _workers;
    readonly Queue<T> _taskQueue = new Queue<T>();
    readonly Action<T> _dequeueAction;

    /// <summary>
    /// Initializes a new instance of the <see cref="SuperQueue{T}"/> class.
    /// </summary>
    /// <param name="workerCount">The worker count.</param>
    /// <param name="dequeueAction">The dequeue action.</param>
    public SuperQueue(int workerCount, Action<T> dequeueAction)
    {
        _dequeueAction = dequeueAction;
        _workers = new List<Thread>(workerCount);

        // Create and start a separate thread for each worker
        for (int i = 0; i < workerCount; i++)
        {
            Thread t = new Thread(Consume) { IsBackground = true, Name = string.Format("SuperQueue worker {0}",i )};
            _workers.Add(t);
            t.Start();
        }
    }

    /// <summary>
    /// Enqueues the task.
    /// </summary>
    /// <param name="task">The task.</param>
    public void EnqueueTask(T task)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(task);
            Monitor.PulseAll(_locker);
        }
    }

    /// <summary>
    /// Consumes this instance.
    /// </summary>
    void Consume()
    {
        while (true)
        {
            T item;
            lock (_locker)
            {
                while (_taskQueue.Count == 0) Monitor.Wait(_locker);
                item = _taskQueue.Dequeue();
            }
            if (item == null) return;

            // run actual method
            _dequeueAction(item);
        }
    }

    /// <summary>
    /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
    /// </summary>
    public void Dispose()
    {
        // Enqueue one null task per worker to make each exit.
        _workers.ForEach(thread => EnqueueTask(null));

        _workers.ForEach(thread => thread.Join());

    }
}

正如之前的海报所说,有许多内置结构(查看 TPL ),它们使用线程池,您可能希望在实现自己的队列之前查看它。

【讨论】:

  • 谢谢,这看起来是个不错的方法。我会看看这个类是否比线程池快。
  • @DoctorZero:你使用成功了吗,速度更快吗?
  • 是的,在它的最终化身中,它有点不同,也更复杂,但想法相同。线程在几秒钟不活动后进入低优先级模式,然后在 30 秒不活动后退出。这与他们期望的工作类型和频率一致。它的运行速度与 ThreadPool 几乎相同,但不那么紧张。此外,我发现通过将数据分成相邻的行而不是块,我可以获得更好的 CPU 缓存,这也产生了影响。
  • 类似的方法是实现自定义TaskScheduler,然后使用Task.Factory.StartNew
【解决方案2】:

因此,通常的做法是让每个线程的入口点本质上执行类似的操作(这只是一种算法,不是 C# 代码,抱歉):

  1. 检查您是否有工作要做
  2. 找到后就开始工作
  3. 等待信号

另一方面,当您的线程有更多工作时,将其添加到工作队列中,然后您的线程实质上就会被重用。这与自己实现线程池的方式非常相似(如果您在运行时中,您可以做一些其他事情来帮助您,但这并不是什么大不了的事)。

【讨论】:

  • 如何在不消耗 CPU 的情况下等待信号?
  • 当您等待信号时,您只是设置为阻塞状态,而当有人发出信号时,调度程序使您可以运行。在被阻塞时,你没有时间在 CPU 上(然而,这并不是最有效的事情,所以通常操作系统会忙着等待一点)。被阻塞的线程不消耗处理器能力。
【解决方案3】:

这是一个讨论这个问题的线程:A custom thread-pool/queue class.

【讨论】:

  • 这个类看起来像是在启动后台工作者。它们不需要与启动线程相同的开销吗?
猜你喜欢
  • 1970-01-01
  • 2014-05-15
  • 1970-01-01
  • 1970-01-01
  • 2019-02-17
  • 2010-12-09
  • 2011-01-17
  • 1970-01-01
  • 2016-09-08
相关资源
最近更新 更多