【问题标题】:Multithreaded Single Producer Multiple Consumer Implementation多线程单生产者多消费者实现
【发布时间】:2012-03-01 06:42:04
【问题描述】:

我想实现单生产者多消费者模式的多文件下载。

我有什么: - 在循环中找到要下载的新链接的代码 - 找到新链接时 - 它调用下载功能 - 下载函数接受源文件路径和目标文件路径并下载文件。

我想做什么 - 我想同时下载 X 个文件(我不知道文件总数) - 在任何时候我都应该能够同时下载 X 文件 - 只要其中 1 个 X 文件完成下载 - 调用函数应该能够立即添加新的下载 - 这反过来又会立即下载

  • 所以我有一个生产者函数,它不断将新的下载添加到队列中(在任何时候最大 X 下载)
  • 多个 X 线程消耗下载并开始单独下载。一旦完成下载 - 生产者应该能够添加新的下载 - 这将产生新的线程。

EXAMPLE 将不胜感激

【问题讨论】:

    标签: c# multithreading queue producer-consumer


    【解决方案1】:

    对于这个 P/C 问题,您只需要BlockingCollection<T>

    //shared and thread-safe
    static BlockingCollection<string> queue = new BlockingCollection<string>(100);
    
    // Producer
    queue.Add(fileName);  // will block when full
    
    // Consumer
    if (queue.TryTake(out fileName, timeOut))  // waits when empty
      ...
    

    您需要使用超时和 CancellationToken 对其进行微调。

    【讨论】:

    • 出列一个item后,进行一些处理,如何将结果返回给调用进程?
    • “调用过程”到底是什么?
    • 我正在考虑实现一个队列来限制 Web 请求。所以我有一个方法可以说来自 web 服务的 DownloadUser() 并且请求被添加到队列中以被限制。如何将结果返回给该方法 DownloadUser()?
    • @HenkHolterman,如果考虑瓶颈,如何应用?假设我们有三个消费者。
    • @Love - 同样的事情。如果 Prod 跟不上,Cons 会等很久。但不要问不到 2 岁答案的问题。
    【解决方案2】:

    ReaderWriterLockSlim 类就是为此而设计的。

    另外,请查看这个关于线程的精彩网站:

    http://www.albahari.com/threading/part4.aspx#_Reader_Writer_Locks

    例子来自上面的网站。

    class SlimDemo
    {
      static ReaderWriterLockSlim _rw = new ReaderWriterLockSlim();
      static List<int> _items = new List<int>();
      static Random _rand = new Random();
    
      static void Main()
      {
        new Thread (Read).Start();
        new Thread (Read).Start();
        new Thread (Read).Start();
    
        new Thread (Write).Start ("A");
        new Thread (Write).Start ("B");
      }
    
      static void Read()
      {
        while (true)
        {
          _rw.EnterReadLock();
          foreach (int i in _items) Thread.Sleep (10);
          _rw.ExitReadLock();
        }
      }
    
      static void Write (object threadID)
      {
        while (true)
        {
          int newNumber = GetRandNum (100);
          _rw.EnterWriteLock();
          _items.Add (newNumber);
          _rw.ExitWriteLock();
          Console.WriteLine ("Thread " + threadID + " added " + newNumber);
          Thread.Sleep (100);
        }
      }
    
      static int GetRandNum (int max) { lock (_rand) return _rand.Next(max); }
    }
    

    【讨论】:

    • 生产者 = 作家,消费者 = 读者。读取器/写入器是读取或写入共享数据的代理的线程命名法中的常用名称(示例中的下载队列 = _items)。
    • _items列表中的线程安全项在Read()方法处理后如何删除?
    【解决方案3】:

    使用并发集合在老板与其工作人员之间进行通信。
    ConcurrentQueue(如果您关心订单)或 ConcurrentBag。
    老板添加到 ConcurrentQueue(Add 方法),船员从队列中取出(Take 方法)。如果您需要代码,请告诉我。

    【讨论】:

      【解决方案4】:

      我建议查看Task Parallel Library。这非常干净地包装了方法调用,并为您管理了多个线程。

      【讨论】:

      猜你喜欢
      • 2017-02-01
      • 1970-01-01
      • 2014-10-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-07-26
      • 2013-04-12
      相关资源
      最近更新 更多