【问题标题】:Which Task<T> extension can I utilize for waiting on other tasks recursively?我可以使用哪个 Task<T> 扩展来递归地等待其他任务?
【发布时间】:2016-04-17 16:17:24
【问题描述】:

我不确定有一种有效的方法来做到这一点。我有文件,其中文件的内容指向其他文件,例如:

A
|-- B
|   |-- C
|   |-- D
|       |-- E
|
|-- F
    |-- C

G
|-- H
|   |-- I
|
|-- D
|   |-- E
|
|-- J

这会持续到成千上万个文件;幸运的是,依赖的深度非常浅,但为了论证,它可能是 N 级深,不可能有循环循环。我的目标是了解每个文件的整个依赖关系(展平)。例如:

  • A: (B, C, D, E, F) -- 请注意,“C”只列出一次。
  • B:(C、D、E)
  • C: ()
  • D: (E)
  • E: ()
  • F: (C)
  • G:(D、E、H、I、J)

我首先创建了一些模型来跟踪这些信息:

public class FileData
{
    public string FilePath { get; set; }

    public ISet<FileInfo> DependentUpon { get; set; }
}

当然,我随后创建了一个List&lt;FileData&gt; 来存储处理后的文件。同步扫描文件的内容来构建这个依赖树(然后将其展平)只会花费太长时间,所以我探索了查看 async/await,这有助于加快速度,但我希望之前让它更快在生产环境中释放它。

我的异步/等待尝试要快得多,但仍然不够高效。

public async Task<ICollection<FileData>> ProcessAsync(IEnumerable<FileInfo> files)
{
    var mappings = new Dictionary<FileInfo, FileData>();

    foreach (var file in files)
    {
        // Static Method that constructs an instance of the class
        // and utilizes async I/O to read the file line-by-line
        // to build any first level dependencies.
        var info = await FileData.CreateAsync(file);

        // Update progress + Other Properties

        mappings.Add(file, info);
    }

    // Go through the list and recursively add to the dependencies
    foreach (var item in list)
    {
        foreach (var dependency in GetAllDependencies(item, mappings))
        {
           file.DependentUpon.Add(dependency);
        }
    }
}

IEnumerable<FileInfo> GetAllDependencies(FileData data, IDictionary<FileInfo, FileData> mappings)
{
    foreach (var file in info.DependentUpon)
    {
        yield return file;

        foreach (var child in GetAllDependencies(mappings[file], mappings))
        {
            yield return child;
        }
    }
}

当然,这在某种程度上是一种不错的异步,但是当我尝试获取层次结构(扁平化)时,它仍然非常同步且缓慢。我正在尝试重构解决方案,以便在分层搜索中利用 async/await 更快地工作。到目前为止,我只有伪描述,我不知道这是否可能或如何正确实现:

创建FileInfoTask&lt;FileData&gt; 的字典(所以我不再等待类实例的构造)。在扫描第一级 DependentUpon 的文件后,我找到了匹配的任务,并且只有在这些任务完成后才继续我当前的任务。当然,这些任务具有相同的指令,因此只有在它们的依赖项完成时才会将它们标记为已完成。我想同时启动所有任务。例如(只是一个例子,我无法预测什么时候完成什么任务):

  • 开始任务 A
  • 开始任务 B
  • 扫描文件 A,DependentUpon (B, F)
  • 启动任务 C
  • 扫描文件 B,DependentUpon (C, D)
  • 任务 A 等到任务 B 和 F 完成。
  • 开始任务 D
  • 扫描文件 C
  • ...
  • 任务 D 等到任务 E 完成。
  • 扫描文件 E,DependentUpon ()
  • 任务 E 已完成
  • 任务 D 已完成
  • 任务 C 已完成
  • 任务 B 已完成。
  • 开始任务 J
  • 任务 F 已完成。
  • 任务 A 已完成。
  • ...
  • 所有任务已完成

【问题讨论】:

  • foreach 循环中await 有什么原因吗?您不能在循环之后将CreateAsync 任务添加到列表然后Task.WhenAll 吗?
  • 我删除了一些冗长的部分,我会在一些 cmets 中更新,但我会在第一个 foreach 循环中更新进度。
  • 您提供的代码甚至无法正确编译。有各种各样的变量没有定义就出现了。

标签: c# asynchronous task-parallel-library hierarchical


【解决方案1】:

考虑使用Task.WhenAll<> 同时等待加载(递归)根项目的任务。您还可以推迟依赖列表扩展,这样可以减少函数的运行时间并更有效地使用内存。

    public class FileData
    {
       public string FilePath { get; set; }

       public ISet<FileInfo> DependentUpon { get; set; }

       public IEnumerable<FileInfo> Dependencies {get; set;}
    }

新属性Dependencies 提供了所有依赖项的列表。 DependentUpon 现在只包含直接依赖项,无需更改。

    public async Task<ICollection<FileData>> ProcessAsync(IEnumerable<FileInfo> files)
    {
        var map = new Dictionary<FileInfo, Task<FileData>>();
        var tasks = files.Select(it => LoadFileDataAsync(it, map));
        return await Task.WhenAll(tasks);
    }

    static async Task<FileData> LoadFileDataAsync(FileInfo fileInfo, Dictionary<FileInfo, Task<FileData>> map)
    {
       // Load recursively FileData elements for all children 
       // storing the result in the map.

        Task<FileData> pendingTask;
        bool isNew;

        lock (map)
        {
            isNew = !map.TryGetValue(fileInfo, out pendingTask);
            if (isNew)
            {
                pendingTask = FileData.CreateAsync(fileInfo);
                map.Add(fileInfo, pendingTask);
            }
        }

        var data = await pendingTask;
        if (isNew)
        {
           // Assign an iterator traversing through the dependency graph
           // Note: parameters are captured by reference.
           data.Dependencies = ExpandDependencies(data.DependsUpon, map);
           if (data.DependsUpon.Count > 0)
           {
              // Recursively load child items
              var tasks = data.DependsUpon.Select(it => (Task)LoadFileDataAsync(it, map));
              await Task.WhenAll(tasks);
           }
        }
        return data;
    }


    static IEnumerable<FileInfo> ExpandDependencies(ISet<FileInfo> directDependencies, Dictionary<FileInfo, Task<FileData>> map)
    {

        if (directDependencies.Count == 0)
        {
            yield break;
        }

        // Depth-first graph traversal

        var visited = new HashSet<FileInfo>(map.Comparer); // check for duplicates
        var stack = new Stack<FileInfo>();

        foreach(var item in directDependencies)
        {
            stack.Push(item);
        }

        while(stack.Count > 0)
        {
            var info = stack.Pop();

            if (visited.Add(info))
            {
                yield return info;

                var data = map[info].Result;

                foreach (var child in data.DependsUpon)
                {
                    stack.Push(child);
                }
            }
        }
    }

Working code samlple

【讨论】:

  • var data = await pendingTask; 不会给您“使用未分配的变量”错误。如果您没有包含要运行的代码的 .NET fiddle 示例,我不会相信编译的代码。很奇怪。
  • @ScottChamberlain 由 map.TryGetValue() 初始化。
  • 啊!我忽略了out
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-04-26
  • 1970-01-01
  • 2012-02-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多