【问题标题】:Custom thread pool C# issue [closed]自定义线程池 C# 问题 [关闭]
【发布时间】:2014-07-17 09:22:26
【问题描述】:

所以,我有一个自定义线程池,它接收最大数量的线程,您可以在其中排队项目。池将执行队列中的项目。问题是这个池的行为总是不同的。我创建了一个测试,为该池提供 X 次操作并等待池完成工作(此等待有一个限制,但该限制足以让所有操作成功结束)。问题是有时测试会返回“成功”响应,但大多数情况下它会超过时间限制和/或不处理所有操作。

代码:

CustomThreadPool.cs

    public class CustomThreadPool : IDisposable
    {
        #region Private Members

        private readonly Thread m_checkThread;

        #endregion

        #region Public Properties

        public int MaxNumberOfThreads { get; set; }
        private readonly object m_lock = new object();
        private int m_currentNumberOfThreads;
        public int CurrentNumberOfThreads
        {
            get
            {
                lock (m_lock)
                {
                    return m_currentNumberOfThreads;
                }
            }
            private set
            {
                lock (m_lock)
                {
                    m_currentNumberOfThreads = value;
                }
            }
        }
        public ConcurrentQueue<WorkItem> QueuedItems { get; set; }

        #endregion

        #region Constructor

        public CustomThreadPool(int maxNumberOfThreads)
        {
            MaxNumberOfThreads = maxNumberOfThreads;

            QueuedItems = new ConcurrentQueue<WorkItem>();
            m_checkThread = new Thread(CheckThread);
            m_checkThread.Start();
        }

        #endregion

        #region Public Methods

        public void QueueItem(object argument, Action<WorkItem> method, string token = "")
        {
            QueuedItems.Enqueue(new WorkItem { Argument = argument, Method = method, Token = token });
        }

        public List<WorkItem> Stop()
        {
            m_checkThread.Abort();
            List<WorkItem> result = new List<WorkItem>();
            while (QueuedItems.Count > 0)
            {
                WorkItem wi;
                QueuedItems.TryDequeue(out wi);
                if (wi != null)
                    result.Add(wi);
            }
            CurrentNumberOfThreads = 0;
            return result;
        }

        #endregion

        #region Private Methods

        // ReSharper disable once FunctionNeverReturns
        private void CheckThread()
        {
            while (true)
            {
                if (CurrentNumberOfThreads >= MaxNumberOfThreads || QueuedItems.Count == 0)
                {
                    Thread.Yield();
                }

                int availableThreads = MaxNumberOfThreads - CurrentNumberOfThreads;

                List<WorkItem> toBeProcessed = new List<WorkItem>();

                for (var i = 0; i < availableThreads; i++)
                {
                    WorkItem wi;
                    QueuedItems.TryDequeue(out wi);

                    if (wi != null)
                    {
                        toBeProcessed.Add(wi);
                    }
                }

                foreach (WorkItem item in toBeProcessed)
                {
                    CurrentNumberOfThreads++;
                    item.ExecutingThread = new Thread(ProcessItem);
                    item.ExecutingThread.Start(item);
                }

                Thread.Sleep(50);
            }
        }

        private void ProcessItem(object wi)
        {
            WorkItem item = (WorkItem)wi;
            item.Method.Invoke(item);
            CurrentNumberOfThreads--;
            item.ExecutingThread.Abort();
        }

        #endregion

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        private void Dispose(bool disposing)
        {
            if (disposing)
            {
                m_checkThread.Abort();
            }
        }
    }

测试:

private List<int> EndedOperations = new List<int>();
private List<int> StartedOperations = new List<int>();

public void CheckThreadPool()
    {
        int workTime = 100;
        int numberOfOperations = 100;
        int numberOfThreads = 10;
        int cycles = numberOfOperations / numberOfThreads;
        int totalTime = workTime * 2 * cycles;
        Stopwatch sw = new Stopwatch();
        sw.Start();
        CustomThreadPool pool = new CustomThreadPool(numberOfThreads);

        for (int i = 0; i < numberOfOperations; i++)
        {
            pool.QueueItem(new WorkInfo(i, workTime), DoWork);
        }
        Thread.Sleep(workTime);
        bool queueEmpty = false, operationsDone = false;
        while (pool.CurrentNumberOfThreads > 0 && sw.ElapsedMilliseconds < totalTime)
        {
            if (pool.QueuedItems.Count == 0 && !queueEmpty)
            {
                queueEmpty = true;
                Debug.WriteLine("Queue emptied at: {0}, operations left: {1}", sw.ElapsedMilliseconds, numberOfOperations - EndedOperations.Count);
            }
            if (EndedOperations.Count == numberOfOperations && !operationsDone)
            {
                operationsDone = true;
                Debug.WriteLine("Operations done at: {0}, number of threads: {1}", sw.ElapsedMilliseconds, pool.CurrentNumberOfThreads);
            }
            Thread.Yield();
        }
        sw.Stop();
        pool.Dispose();
        Thread.Sleep(workTime);
        Debug.WriteLine("Test ended with {0} unprocessed operations", numberOfOperations - EndedOperations.Count);
        for (int i = 0; i < numberOfOperations; i++)
        {
            if (!EndedOperations.Contains(i))
                Debug.WriteLine("Operation {0} was not fully processed", i);
            if (!StartedOperations.Contains(i))
                Debug.WriteLine("Operation {0} has never started", i);
        }
        Assert.IsTrue(sw.ElapsedMilliseconds < totalTime,
            string.Format(@"The pool did not stop in useful time. 
                                Remaining threads : {0}
                                Remaining queued items : {1}
                                Remaining operations: {2}",
                                pool.CurrentNumberOfThreads, pool.QueuedItems.Count, numberOfOperations - EndedOperations.Count));
        Assert.IsTrue(pool.QueuedItems.Count == 0,
            string.Format(@"Not all items were processed. 
                                Remaining : {0}
                                Processing time : {1}",
                                pool.QueuedItems.Count, sw.ElapsedMilliseconds));
    }

    private void DoWork(WorkItem wi)
    {
        WorkInfo info = (WorkInfo)wi.Argument;
        try
        {
           StartedOperations.Add(info.Id);
            Thread.Sleep(info.TestTime);
            EndedOperations.Add(info.Id);
        }
        catch(Exception ex)
        {
            Debug.WriteLine("id: {0}, ex: {1}", info.Id, ex.Message);
        }

    }

我假设问题可能来自共享资源,但我不知道是哪一个。

谢谢。

【问题讨论】:

  • 这个问题非常广泛,无论是线程池类还是测试类都充满了疏忽和脆弱的代码。如果您需要一个线程池,.NET 框架为您提供了一个。如果您有兴趣自己编写一个作为练习,我建议您先研究 .NET 实现并理解它。 tinyurl.com/k6d9l6v 事实上,这甚至不是真正的线程池。您在添加工作时创建线程 - 池旨在保持许多线程处于活动状态并准备好工作。这比直接分离一个线程要好一点。

标签: c# .net multithreading thread-safety


【解决方案1】:

即使你在CurrentNumberOfThreads 的赋值上加上了lock,这还不够,因为你的递增和递减操作不是原子的。

你应该使用Interlocked类提供的原子操作:

替换

CurrentNumberOfThreads++;

System.Threading.Interlocked.Increment(ref m_CurrentNumberOfThreads);

CurrentNumberOfThreads--;

System.Threading.Interlocked.Decrement(ref m_CurrentNumberOfThreads);

还有一点需要注意:永远不要abort 像这样的线程。尝试为他们提供一些通知并优雅地退出。

【讨论】:

  • 非常感谢。你救了我的命。不过还有一个问题。在运行测试时,我观察到根据自定义线程池的可用线程数,测试需要很长时间。现在我知道如果进程有很多线程,它可能不太理想,但是我怎样才能找到线程数的范围(取决于机器的处理器)来初始化自定义线程池?非常感谢!
  • 这取决于很多参数。我建议您在测试中放置一些计数器并调整线程数,然后您可以对结果进行插值并找到最佳点。
猜你喜欢
  • 2011-04-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-09-23
  • 2020-10-02
相关资源
最近更新 更多