【问题标题】:How to pass different instances while multithreading?多线程时如何传递不同的实例?
【发布时间】:2014-08-17 11:32:06
【问题描述】:

我正在构建一个刮板。我的目标是启动 X 个浏览器(其中 X 是线程数),然后通过将列表分成 X 个部分来继续抓取每个 URL 的列表。

我决定使用包含 10 个 URL 列表的 3 个线程(3 个浏览器)。

问题:如何像这样在浏览器之间分离每个任务:

  1. Browser1 从 0 到 3 抓取列表中的项目

  2. Browser2 将列表中的项目从 4 刮到 7

  3. Browser3 从 8 到 10 抓取列表中的项目

所有浏览器都应该同时在抓取传递的 URL 列表。

我已经有了这个BlockingCollection

BlockingCollection<Action> _taskQ = new BlockingCollection<Action>();

public Multithreading(int workerCount)
{
        // Create and start a separate Task for each consumer:
        for (int i = 0; i < workerCount; i++)
            Task.Factory.StartNew(Consume);
}

public void Dispose() { _taskQ.CompleteAdding(); }

public void EnqueueTask(Action action) { _taskQ.Add(action); }

void Consume()
{
// This sequence that we’re enumerating will block when no elements
// are available and will end when CompleteAdding is called. 
foreach (Action action in _taskQ.GetConsumingEnumerable())
            action();     // Perform task.
}
public int ItemsCount()
{
        return _taskQ.Count;
}

可以这样使用:

Multithreading multithread = new Multithreading(3); //3 threads
foreach(string url in urlList){

    multithread.EnqueueTask(new Action(() => 
    {
         startScraping(browser1); //or browser2 or browser3
    }));
}

我需要在抓取之前创建浏览器实例,因为我不想在每个线程都启动一个新浏览器。

【问题讨论】:

  • "...因为我不想在每个线程都启动一个新浏览器" - 那么如何并行完成任何工作?
  • @MitchWheat,我的意思是我想从程序一开始就启动X浏览器,一直使用到最后。我通常做的是一个循环,启动浏览器,完成它的工作,关闭浏览器。有没有办法从一开始就启动所有浏览器并使用它们直到结束?
  • 为什么线程 1 应该处理 URL 0..3 ?做 0,3,7,... 会有什么问题?
  • @HenkHolterman,对不起,我应该说得更清楚。实际顺序无关紧要,这只是一个例子。我只希望链接在所有浏览器(线程)之间平均分配。
  • 为什么要平等?如果一个站点需要更多时间,其他线程是否应该空闲?

标签: c# multithreading


【解决方案1】:

考虑到 Henk Holterman 的评论,您可能希望获得最高速度,即尽可能让浏览器保持忙碌状态,请使用:

private static void StartScraping(int id, IEnumerable<Uri> urls)
{
    // Construct browser here
    foreach (Uri url in urls)
    {
        // Use browser to process url here
        Console.WriteLine("Browser {0} is processing url {1}", id, url);
    }
}

主要:

    int nrWorkers = 3;
    int nrUrls = 10;
    BlockingCollection<Uri> taskQ = new BlockingCollection<Uri>();
    foreach (int i in Enumerable.Range(0, nrWorkers))
    {
        Task.Run(() => StartScraping(i, taskQ.GetConsumingEnumerable()));
    }
    foreach (int i in Enumerable.Range(0, nrUrls))
    {
        taskQ.Add(new Uri(String.Format("http://Url{0}", i)));
    }
    taskQ.CompleteAdding();

【讨论】:

    【解决方案2】:

    我认为通常的方法是拥有一个阻塞队列、一个提供者线程和一个任意的工作池。

    提供者线程负责将 URL 添加到队列中。当没有要添加的时候它会阻塞。

    一个工作线程实例化一个浏览器,然后从队列中检索单个 URL,将其抓取,然后循环返回以获取更多信息。队列为空时阻塞。

    您可以启动任意数量的工人,他们只是在他们之间进行排序。

    主线启动所有线程并退出到场边。它负责处理 UI(如果有的话)。

    多线程很难调试。您可能希望至少在部分工作中使用 Tasks。

    【讨论】:

      【解决方案3】:

      你可以给任务和工人一些Id。然后您将拥有BlockingCollection[] 而不仅仅是BlockingCollection。每个消费者都将从数组中自己的BlockingCollection 消费。我们的工作是找到合适的消费者并发布工作。

      BlockingCollection<Action>[] _taskQ;
      private int taskCounter = -1;
      public Multithreading(int workerCount)
      {
          _taskQ = new BlockingCollection<Action>[workerCount];
      
          for (int i = 0; i < workerCount; i++)
          {
              int workerId = i;//To avoid closure issue
              _taskQ[workerId] = new BlockingCollection<Action>();
              Task.Factory.StartNew(()=> Consume(workerId));
          }
      }
      
      public void EnqueueTask(Action action)
      {
          int value = Interlocked.Increment(ref taskCounter);
          int index = value / 4;//Your own logic to find the index here    
          _taskQ[index].Add(action);
      }
      
      void Consume(int workerId)
      {
          foreach (Action action in _taskQ[workerId].GetConsumingEnumerable())
             action();// Perform task.
      }
      

      【讨论】:

        【解决方案4】:

        使用后台工作者的简单解决方案可以限制线程数:

        public class Scraper : IDisposable
        {
            private readonly BlockingCollection<Action> tasks;
            private readonly IList<BackgroundWorker> workers;
        
            public Scraper(IList<Uri> urls, int numberOfThreads)
            {
                for (var i = 0; i < urls.Count; i++)
                {
                    var url = urls[i];
                    tasks.Add(() => Scrape(url));
                }
        
                for (var i = 0; i < numberOfThreads; i++)
                {
                    var worker = new BackgroundWorker();
                    worker.DoWork += (sender, args) =>
                    {
                        Action task;
                        while (tasks.TryTake(out task))
                        {
                            task();
                        }
                    };
                    workers.Add(worker);
                    worker.RunWorkerAsync();
                }
            }
        
            public void Scrape(Uri url)
            {
                Console.WriteLine("Scraping url {0}", url);
            }
        
            public void Dispose()
            {
                throw new NotImplementedException();
            }
        }
        

        【讨论】: