【问题标题】:Producer/Consumer - Cascading Approach?生产者/消费者 - 级联方法?
【发布时间】:2012-08-27 23:21:07
【问题描述】:

我目前正在使用 .net 4.0 和 winforms 构建一个与服务器相关的小型应用程序。我想利用任务并行库的优势,但我对这里的最佳或“正确”实现有点动摇。

目的:

  1. 使用正则表达式模式使用网络路径中的文件(每 15 分钟使用一次。)
  2. 读取文件(csv 样式)
  3. 重写文件以跳过某些列
  4. 通过批量插入或更新将文件数据传输到 sql server

我正在考虑一种级联方法,如下所示:

ProducerConsumerTask1(从网络路径获取文件/使文件可供读取)
ProducerConsumerTask2(从Task1读取文件/从Task1重写文件)
ProducerConsumerTask3(获取重写文件/将文件从Task2传输到DB)

还有一点代码:

private static BlockingCollection<ManagedFile> searchQueue = new BlockingCollection<ManagedFile>(limit);
private const int limit = 100;

public void StartFileTask()
{
    Task[] producers = new Task[1];
    producers[0] = Task.Factory.StartNew(() => ProduceFileSearchTask());


    Task.Factory.StartNew(() => ConsumeFileSearchTask());
}

public static void ProduceFileSearchTask()
{
    var pattern = new Regex(Properties.Settings.Default.DefaultRegexPattern);
    string path = Properties.Settings.Default.DefaultImportPath;

    IEnumerable<FileInfo> files = new DirectoryInfo(path)
                                        .EnumerateFiles("*.*", SearchOption.AllDirectories)
                                        .Where(x => pattern.IsMatch(x.Name));

    for (int i = 0; i < files.ToList().Count(); i++)
    {
        ManagedFile _managedFile = new ManagedFile();
        _managedFile.Id = Guid.NewGuid();
        _managedFile.ManagedFileName = files.ElementAt(i).FullName;
        _managedFile.ManagedFileAddedOn = DateTime.Now;

        if (!searchQueue.IsAddingCompleted)
            searchQueue.Add(_managedFile);

        Thread.SpinWait(100000); 
    }           
}

public static void ConsumeFileSearchTask()
{
    foreach (var item in searchQueue.GetConsumingEnumerable())
    {
        // use ProducerTask for Reading the Files here
    }
}

如果有人分享他对这个想法的想法,那就太好了。这是一个很好的处理方式吗?在这种情况下有什么更好的办法?这种情况下的另一个主题:ui自动化/报告/状态更新到ui怎么样?如何才能做到这一点?活动/代表,嗯?

谢谢!

【问题讨论】:

  • 文件大小是多少(它有多少行)?每行有多少列?处理一行的计算难度是多少?
  • 另外,同一个文件是否每 15 分钟消耗一次?如果不是 - 文件 B(时间 = t + 15)可以在文件 A(时间 = t)仍在处理时开始吗?我们在这里谈论的文件大小/行数是多少?
  • 文件大小略有不同,但我认为每个文件不超过 1mb。保存来自旧 dos 程序的数据的文件需要通过 sql 填充到另一个(较新的)应用程序。文件中的每一行代表一个更新的客户信息。可以是 1 行或 100 行,大约 210 列。在我看来,计算难度很低,但是如果某些列被禁止导入,则需要重写每一行。
  • @ananthonline 没有。后台会不断生成新文件。是的,如果文件 a 仍在处理中,则文件 b 可以开始。
  • 这看起来是使用 Tasks.Dataflow 的完美场景。看看这个,它可能对你有很大帮助:microsoft.com/en-us/download/details.aspx?id=14782

标签: c# winforms .net-4.0 task-parallel-library producer-consumer


【解决方案1】:

添加我的 cmets 作为答案 :)

这看起来是使用 Tasks.Dataflow 的完美场景。看看这个,它可能对你有很大帮助:Tasks.DataFlow Whitepaper

另一种建议的方法: 一项任务读取新文件并将其中一些文件放入 BlockingCollection(又名生产者-消费者)。消费者任务维护并发任务列表并从集合中读取以安排新任务。通过调整消费者任务以及它可以同时跟踪多少个文件,您可以检查您的性能。一旦消费者收到某个任务完成的通知,再次从生产者那里读取并安排另一个任务。它们将独立并行。

另一个值得关注的框架是 Reactive Extensions 并将您的源代码转换为可观察的文件集合并在其中应用节流。

【讨论】:

  • 谢谢!我目前正在使用数据流构建解决方案。当我完成后,我会在这里发布结果以帮助其他人。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多