【问题标题】:concurrect oprating with limit on thread count in a infinity loop无限循环中线程数限制的并发操作
【发布时间】:2015-08-04 13:54:39
【问题描述】:

我编写了一个无限循环,用于从队列 (RabbitMQ) 中拉取并在并发线程中处理每个拉取的项目,同时运行线程的数量有限。 现在我想要一个限制线程执行计数的解决方案。请参阅我的循环示例:

public class ThreadWorker<T>
{
    public List<T> _lst;
    private int _threadCount;
    private int _maxThreadCount;
    public ThreadWorker(List<T> lst, int maxThreadCount)
    {
        _lst = lst;
        _maxThreadCount = maxThreadCount;
    }

    public void Start()
    {
        var i = 0;
        while (i < _lst.Count)
        {
            i++;
            var pull = _lst[i];

            Process(pull);
        }
    }

    public void Process(T item)
    {
        if (_threadCount > _maxThreadCount)
        {
            //wait any opration be done 
            // How to wait for one thread?

            Interlocked.Decrement(ref _threadCount);
        }

        var t = new Thread(() => Opration(item));

        t.Start();

        Interlocked.Increment(ref _threadCount);
    }

    public void Opration(T item)
    {
        Console.WriteLine(item.ToString());
    }
}

请注意,当我使用信号量进行限制时,Start() 方法不会等待所有正在运行的线程。我的循环应该在使用 _maxThreadCount 运行线程后,等待释放线程,然后推送新线程以进行并发处理。

【问题讨论】:

  • 我的问题是无限并行。对于 ParallelOption.maxdegreeofparallelism 但我不想使用它。我想在我的代码中管理线程。

标签: c# multithreading thread-safety deadlock concurrent-programming


【解决方案1】:

我会用这种方式Semaphore来控制线程数:

public class ThreadWorker<T>
{
    SemaphoreSlim _sem = null;
    List<T> _lst;

    public ThreadWorker(List<T> lst, int maxThreadCount)
    {
        _lst = lst;
        _sem = new SemaphoreSlim(maxThreadCount);
    }

    public void Start()
    {
        var i = 0;
        while (i < _lst.Count)
        {
            i++;
            var pull = _lst[i];
            _sem.Wait(); /*****/
            Process(pull);
        }
    }

    public void Process(T item)
    {
        var t = new Thread(() => Opration(item));
        t.Start();
    }

    public void Opration(T item)
    {
        Console.WriteLine(item.ToString());
        _sem.Release(); /*****/
    }
}

【讨论】:

  • 谢谢,但是当我们使用 semaphore 或 semaphoreSlim 时,Start() 方法不会等待完成所有正在运行的线程。您对这个问题有什么想法吗?
  • @Arash 然后将for (int i = 0; i &lt; maxThreadCount; i++) _sem.Wait(); 添加到Start 的末尾
猜你喜欢
  • 2014-11-12
  • 1970-01-01
  • 2012-11-14
  • 1970-01-01
  • 1970-01-01
  • 2011-09-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多