【问题标题】:Parallel processing of incoming xml files并行处理传入的 xml 文件
【发布时间】:2012-04-04 22:08:43
【问题描述】:

我需要处理传入的 xml 文件(它们将由其他应用程序直接在特定文件夹中创建),我需要快速完成。

每天最多可以有 200 000 个文件,我目前的假设是使用 .NET 4tpl

我目前的服务理念是:

在一个循环中,我想检查文件夹中的新文件,如果我找到其中任何一个,我会将它们放入队列中,这将由另一个循环处理,该循环将从队列中获取文件并为每个文件创建新任务(线)。同时任务的数量应该是可配置的。 第一部分很简单,但创建两个主循环并在它们之间建立队列对我来说是新的。

还有一个问题: 如何创建两个循环(一个用于检查文件夹并添加文件,第二个用于从队列中获取文件并并行处理它们)并添加队列以在它们之间进行通信。

对于第一部分(文件夹检查),建议的解决方案是使用 FileSystemWatcher。现在需要讨论第二部分(也许是一些任务计划程序)。

【问题讨论】:

  • 如何创建两个循环(一个用于检查文件夹并添加文件,第二个用于从队列中获取文件并并行处理它们)并添加队列以在它们之间进行通信。
  • 请编辑“问题”以实际包含问题

标签: .net queue task-parallel-library


【解决方案1】:

听起来你拼图中缺少的部分是BlockingCollection

FileSystemWatcher watcher;
BlockingCollection<string> bc; 
private readonly object _lock = new object();
Task[] tasks;

void PrepareWatcher()
{
    watcher = new FileSystemWatcher(@"c:");
    watcher.Created += (s,e) => 
    {
        lock(_lock) //Prevents race condition when stopping
        {
            if (!bc.IsAddingCompleted)
                bc.Add(e.FullPath);
        }
    };
}

void StartProcessing(int taskCount)
{
    tasks = new Task[taskCount];
    bc = new BlockingCollection<string>();

    for (int i = 0; i< taskCount; i++)
        tasks[i] = (Task.Factory.StartNew(() =>
        {
            foreach (var x in bc.GetConsumingEnumerable())
                ProcessXml(x);
        }, TaskCreationOptions.LongRunning)); 

    watcher.EnableRaisingEvents = true;
}

void ProcessXml(string path)
{
    //Do your processing here...
    //Note many events will be called multiple times, see:
    //http://weblogs.asp.net/ashben/archive/2003/10/14/31773.aspx
}

void StopProcessing()
{
    watcher.EnableRaisingEvents = false;

    lock (_lock) //The above line doesn't guarantee no more events will be called,
                 //And Add() and CompleteAdding() can't be called concurrently
        bc.CompleteAdding(); 

    Task.WaitAll(tasks);
    foreach (var task in tasks)
        task.Dispose();
    bc.Dispose();
    tasks = null;
}

【讨论】:

    【解决方案2】:

    我很惊讶还没有人问,但考虑到您想要实现的是两个应用程序之间的某种消息传递,您是否考虑过使用 WCF?

    【讨论】:

      【解决方案3】:

      可能不需要循环,也不确定是否需要并行。如果您想处理一批新文件,这将很有用。 将出现新文件的文件夹上的 FileSystemWatcher 将为您提供将文件添加到队列的事件。

      为添加到队列的项目添加事件,以触发线程处理单个文件。

      如果你敲出一个简单的类、文件、状态、检测到的时间等。

      您将有一个检测线程添加到队列中,一个线程池来处理它们,并在成功时将它们从队列中删除。

      您可能会发现上一个问题在 .net 4 中对 threasafe “列表”很有用

      Thread-safe List<T> property

      特别是如果您想处理自 X 以来的所有新文件。

      请注意,如果您不打算使用 FileSystem 观察程序而只从文件夹中获取文件,则将它们移动到已处理的文件夹以及可能的失败文件夹将是一个好主意。读取 200,00 个文件名以检查您是否已处理它们,这会消除并行处理它们的任何好处。

      即使你这样做,我也会推荐它。只需将其移回 To Process(或在失败的情况下进行编辑后)将触发它被重新处理。另一个优点是,如果您正在处理一个数据库,并且一切都顺利进行,并且您的最后一次备份是在 X 上。您可以还原,然后只需将您处理过的所有文件移回“toprocess”文件夹。

      您还可以使用已知输入进行测试运行,并在前后检查数据库的状态。

      进一步评论。

      Task 使用的 ThreadPool 有一个 ThreadPool 限制,适用于您应用中的所有任务或后台任务。

      评论后。

      如果要限制并发任务的数量...

      十个初学者,您可以轻松改进,用于调整和提升。

      在您管理从文件队列中启动任务的类中,例如

      private object _canRunLock;
      private int _maxTasks;
      private int _activeTasks;
      
      public MyTaskManager(int argMaxTasks)
      {
        _maxTasks = argMaxTasks;
        _canRunLock = new object();
        _activeTasks = 0;
      }
      
      
      public bool CanRunTask(MyTask argTask)
      {
        get
        {
          lock(_canRunLock)
          {
            if (_activeTasks < _maxTasks)
            {
              ExecuteTask(argTask);
              _activeTasks++;
              return true;
            }
          }
          return false;
        }
      }
      
      public void TaskCompleted()
      {
        lock(_canRunLock)
        {
          if (_activeTasks > 0)
          {
            _activeTasks--;
          }
          else
          {
            throw new WTFException("Okay how did this happen?");
          }
        }
      }
      

      简单而安全(我认为)。您也可以让另一个属性暂停或禁用以进行检查。可能想让上面的单例( :( ),或者至少要记住,如果你运行多个......

      我能给出的最好建议是从简单、开放和解耦开始,然后根据需要进行复杂化,在这里过早地开始优化很容易。一个好主意不要让所有线程都在等待文件系统或后端,但我怀疑处理器的数量是否会成为瓶颈,所以你的 maxTasks 有点悬而未决。 在下限和上限之间进行某种自我调整可能是一件好事,而不是一个固定的数字。

      【讨论】:

      • FileSystemWatcher 将非常有用,但问题的第二部分更棘手。如何控制文件的多线程处理? FileSystemWatcher 会将文件添加到队列中,但如何控制线程创建(为了不产生太多线程,比如说最多 4 个线程用于文件处理)?
      • 请注意FileSystemWatcher has some limitations 可以同时处理多少文件更改。您必须适当地设置缓冲区大小。
      • 等一下。有很多方法可以做到这一点。
      【解决方案4】:

      我认为您可以通过 FileSystemWatcher 检查新文件。 http://www.codeproject.com/Articles/25443/Watching-Folder-Activity-in-C-NET有一篇文章。

      FileSystemWatcher 帮助您不在特定文件夹中循环。

      希望对您有所帮助。

      【讨论】:

        【解决方案5】:

        IMO 你想要的是cron 的工作。该算法的一个版本可以是:

        for every job (called periodically via cron/scheduler) run
        
           //
           // your program
           //
           if job_is_running {
              // Still busy...
              // don't process anything and just return back
              return
           }
        
           // Create your array
           //
           Array a = new Array()
           for each file in folder {
              a.append(file)
           }
        
           // Process each file
           //
           for each item in a {
             process_item(item);
        
             // Move it (or delete)
             //
             remove_from_input_folder(item)
           }
        

        现在,您可以在处理前调用remove_from input(),避免系统崩溃时重复处理。

        不久前我不得不为一家电话公司做类似的事情,这是我们得到的最舒适的解决方案:)

        更新:并行位

        与实际处理相比,通过文件循环构建数组在理论上可以忽略不计。因此,您可以轻松地将第二个循环转换为基于 worker 的并行变体。

        HTH

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2013-10-11
          • 1970-01-01
          • 1970-01-01
          • 2013-05-18
          • 2012-01-18
          • 2012-07-29
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多