【问题标题】:C# TPL calling tasks in a parallel manner and asynchronously creating new filesC# TPL 以并行方式调用任务并异步创建新文件
【发布时间】:2015-09-09 23:39:54
【问题描述】:

我正在努力学习 TPL。我以这样的并行方式写入文件:

public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
    int count = 0;
    foreach (var str in list)
    {
        string fullPath = path + @"\" + count.ToString() + "_element.txt";
        using (var sw = File.CreateText(fullPath))
        {
            await sw.WriteLineAsync(str);
        }
        count++;

        Log("Saved in thread: {0} to {1}", 
           Environment.CurrentManagedThreadId,
           fullPath);

        if (ct.IsCancellationRequested)
            ct.ThrowIfCancellationRequested();
    }
}

然后这样称呼它:

var tasks = new List<Task>();

try
{
    tasks.Add(SaveToFilesAsync(path, myListOfStrings, cts.Token));
}
catch (Exception ex)
{
    Log("Failed to save: " + ex.Message);
    throw;
}

tasks.Add(MySecondFuncAsync(), cts.Token);
//...
tasks.Add(MyLastFuncAsync(), cts.Token);

try
{
    //Or should I call await Task.WhenAll(tasks) ? What should I call here?
    Task.WaitAll(tasks.ToArray()); 
}
catch (AggregateException ex)
{
    foreach (var v in ex.InnerExceptions)
       Error(ex.Message + " " + v.Message);
}
finally
{
   cts.Dispose();
} 

foreach (task in tasks)
{
// Now, how to print results from the tasks? 
//Considering that all tasks return bool value, 
//I need to do something like this:
if (task.Status != TaskStatus.Faulted)
         Console.Writeline(task.Result);
else
         Log("Error...");
}

我的目标是让所有功能(SaveToFilesAsyncMySecondFuncAsync)以并行方式同时运行,使用计算机上的所有内核并节省时间。但是当我看到SaveToFilesAsync 的日志时,我意识到保存到文件总是发生在同一个线程中,而不是并行发生。我究竟做错了什么?第二个问题:如何从代码末尾的任务列表中的每个任务中获取 Task.Result?如果第二个函数返回 Task(bool),如何在我的代码中获取 bool 值?此外,由于我是 TPL 的新人,因此非常欢迎所有关于我的代码的 cmets。

【问题讨论】:

  • 你应该把你的两个单独的问题放在两个单独的问题中。
  • 我在上面的代码中看不到任何并行性,创建新任务不一定会在单独的线程池线程上启动它们。您是否尝试并行化 SaveToFilesAsync 或要运行的任务列表中的代码?您应该查看具有 ParallelOptions 参数的 Parallel.ForEach() 方法之一。 *编辑:我明白了,您希望 SaveFilesAsync 中的并行性
  • 您的Log() 调用是多线程可读的吗?或者它本质上是那些其他任务上下文跳转到的单线程操作?

标签: c# .net concurrency async-await task-parallel-library


【解决方案1】:

您需要将 foreach 循环(从第一项到最后一项按顺序运行)替换为可配置为并行性的 Parallel.ForEach() 循环,或为您提供索引的 Parallel.For()当前处理的项目。由于您需要对文件名使用计数器,因此您需要修改列表参数以提供在创建列表时填充的文件编号,或者使用 Parallel.For() 提供的索引。另一种选择是有一个长变量,您可以在创建文件名后对其执行 Interlocked.Increment 但我不确定这是否是最佳的,我还没有尝试过。

这就是它的样子。

将调用 SaveFilesAsync 的代码封装在 try/catch 中以处理通过 CancellationTokenSource 取消的操作

var cts = new CancellationTokenSource();

try
{
    Task.WaitAll(SaveFilesAsync(@"C:\Some\Path", files, cts.Token));
}
catch (Exception)
{
    Debug.Print("SaveFilesAsync Exception");
}
finally
{
    cts.Dispose();
}

然后在那个方法中做你的并行。

public async Task SaveFilesAsync(string path, List<string> list, CancellationToken token)
{
    int counter = 0;

    var options = new ParallelOptions
                      {
                          CancellationToken = token,
                          MaxDegreeOfParallelism = Environment.ProcessorCount,
                          TaskScheduler = TaskScheduler.Default
                      };

    await Task.Run(
        () =>
            {
                try
                {
                    Parallel.ForEach(
                        list,
                        options,
                        (item, state) =>
                            {
                                // if cancellation is requested, this will throw an OperationCanceledException caught outside the Parallel loop
                                options.CancellationToken.ThrowIfCancellationRequested();

                                // safely increment and get your next file number
                                int index = Interlocked.Increment(ref counter);
                                string fullPath = string.Format(@"{0}\{1}_element.txt", path, index);

                                using (var sw = File.CreateText(fullPath))
                                {
                                    sw.WriteLine(item);
                                }

                                Debug.Print(
                                    "Saved in thread: {0} to {1}",
                                    Thread.CurrentThread.ManagedThreadId,
                                    fullPath);
                            });
                }
                catch (OperationCanceledException)
                {
                    Debug.Print("Operation Canceled");
                }
            });
}

您的代码的其他部分没有改变,只需调整您创建文件内容列表的位置。

编辑:调用 SaveFileAsync 方法的 try/catch 实际上什么也没做,都是在 SaveFileAsync 内部处理的。

【讨论】:

  • 如果您使用的是Tasks,为什么要使用Parellel.ForEach?为什么不只是Task.WhenAll?而且您不会等待WriteLineAsync 的电话。
  • 我提出了一些与他实现的其余部分相匹配的东西。我注意到没有等待 WriteLineAsync 并且会导致问题,它应该只是一个 WriteLine 代替(在我的答案中更改了它)。此外,创建任务并不意味着它们会在单独的线程中运行,因此 Parallel.ForEach 实际上将在使用尽可能多的可用总处理器的单独线程上处理环境可以处理的尽可能多的文件。
【解决方案2】:

试试这个:

public async Task SaveToFileAsync(string fullPath, line)
{
    using (var sw = File.CreateText(fullPath))
    {
        await sw.WriteLineAsync(str);
    }

    Log("Saved in thread: {0} to {1}", 
       Environment.CurrentManagedThreadId,
       fullPath);
}

public async Task SaveToFilesAsync(string path, List<string> list)
{
    await Task.WhenAll(
        list
            .Select((line, i) =>
                SaveToFileAsync(
                    string.Format(
                        @"{0}\{1}_element.txt",
                        path,
                        i),
                    line));
}

由于您每个文件只写一行,并且您想将其全部并行化,所以我认为它是不可取消的。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-20
    • 1970-01-01
    • 2015-07-01
    相关资源
    最近更新 更多