【问题标题】:Using TPL with parallel blocking IO operations将 TPL 与并行阻塞 IO 操作一起使用
【发布时间】:2017-07-16 03:22:46
【问题描述】:

前言:我知道使用 ThreadPool(通过 TPL 或直接)进行 IO 操作 is generally frowned upon 因为 IO 必须是顺序的,但是我的问题与阻塞调用的“并行 IO”有关公开一个Async 方法。

我正在编写一个 GUI 工具,用于获取有关网络上执行此操作的计算机的信息(简化代码):

String[] computerNames = { "foo", "bar", "baz" };
foreach(String computerName in computerNames) {

    Task.Factory
        .StartNew( GetComputerInfo, computerName )
        .ContinueWith( ShowOutputInGui, RunOnGuiThread );

}

private ComputerInfo GetComputerInfo(String machineName) {

    Task<Int64>     pingTime  = Task.Factory.StartNew( () => GetPingTime( machineName ) );
    Task<Process[]> processes = Task.Factory.StartNew( () => System.Diagnostics.Process.GetProcesses( machineName ) );
    // and loads more

    Task.WaitAll( pingtime, processes, etc );

    return new ComputerInfo( pingTime.Result, processes.Result, etc );
}

当我运行这段代码时,我发现与我使用的旧顺序代码相比,它的运行时间长得惊人。

请注意,GetComputerInfo 方法中的每个任务完全独立于它周围的其他任务(例如 Ping 时间可以与 GetProcesses 分开计算),但是当我插入一些 Stopwatch 计时调用时,我发现个人子任务,例如 GetProcesses 调用仅在调用 GetComputerInfo 之后才开始到 3000ms - 存在一些很大的延迟。

我注意到,当我将外部并行调用的数量减少到GetComputerInfo(通过减少computerNames 数组的大小)时,几乎立即返回了第一个结果。一些计算机名称是针对已关闭的计算机的,因此称为GetProcessesPingTime 需要很长时间才能超时(我的真实代码会捕获异常)。这可能是因为离线计算机阻止了 Tasks 的运行,而 TPL 自然将其限制为我的 CPU 硬件线程数 (8)。

有没有办法告诉 TPL 不要让内部任务(例如GetProcesses)阻塞外部任务(GetComputerInfo)?

(我已经尝试过“父/子”任务附件/阻止,但它不适用于我的情况,因为我从未明确地将子任务附加到父任务,并且父任务自然会等待Task.WaitAll )。

【问题讨论】:

  • 如果GetComputerInfo() 里面没有Task.WaitAll() 可能会更好......有点违背了目的。为什么不返回Task[]
  • 如果没有一个好的minimal reproducible example 可靠地重现问题,即使不是不可能,也很难诊断问题。也就是说,请记住,线程池并没有无限数量的线程在等待您。在空闲时,它最多只有少数线程(等于 CPU 内核数),并且只会每半秒到一秒启动新线程(IIRC 这是可配置的,我不记得默认值)。您可以通过 a) 使用异步 ping 方法和 b) 使用 ThreadPool.SetMinThreads() 来增加空闲线程的数量,以便它们在您需要时准备好。
  • 请注意,调用SetMinThreads() 有点小技巧。不幸的是 .NET 没有 GetProcesses() 方法的异步版本;但我什至看不到在本机代码中执行此操作的异步方式。如果您真的希望这些操作是并行的,我认为您将不得不自己管理线程。
  • 顺便说一句,您不使用Task.Run()await的任何原因?
  • @PeterDuniho await 电话会去哪里?

标签: c# multithreading task-parallel-library


【解决方案1】:

我假设您在某个事件处理程序中有您的 foreach 循环,因此您应该做的第一件事是将其标记为 async 以便您可以以异步方式调用您的另一个。之后,你应该介绍你的GetComputerInfo来做asyncall the way down

您的代码中还有其他陷阱:StartNew is dangerous,因为它使用Current 调度程序执行任务,而不是Default(因此您需要其他重载)。不幸的是,这个重载需要更多的参数,所以代码不会那么简单。好消息是您仍然需要该重载来告诉线程池您的任务正在长时间运行,因此它应该为它们使用专用线程:

TaskCreationOptions.LongRunning

指定一个任务将是一个长时间运行的粗粒度操作,比细粒度系统涉及更少、更大的组件。它向TaskScheduler 提供了一个提示,表明可能需要超额订阅。

超额订阅允许您创建比可用硬件线程数更多的线程。它还向任务调度程序提示该任务可能需要额外的线程这样它就不会阻塞本地线程池队列上其他线程或工作项的前进进度。

您还应该避免使用WaitAll 方法,因为它是一个阻塞操作,因此您可以少用1 线程来完成实际工作。您可能想使用WhenAll

最后,为了返回您的ComputerInfo 结果,您可以使用TaskCompletionSource 用法的延续,因此您的代码可能是这样的(还添加了取消逻辑):

using System.Diagnostics;

// handle event in fire-and-forget manner
async void btn_Click(object sender, EventArgs e)
{
    var computerNames = { "foo", "bar", "baz" };
    foreach(String computerName in computerNames)
    {
        var compCancelSource = new CancellationTokenSource();

        // asynchronically wait for next computer info
        var compInfo = await GetComputerInfo(computerName, compCancelSource. Token);
        // We are in UI context here
        ShowOutputInGui(compInfo);
        RunOnGuiThread(compInfo);
    }
}

private Task<ComputerInfo> GetComputerInfo(String machineName, CancellationToken token)
{
    var pingTime = Task.Factory.StartNew(
        // action to run
        () => GetPingTime(machineName),
        //token to cancel
        token,
        // notify the thread pool that this task could take a long time to run,
        // so the new thread probably will be used for it
        TaskCreationOptions.LongRunning,
        // execute all the job in a thread pool
        TaskScheduler.Default);

    var processes = Task.Run(() => Process.GetProcesses(machineName), token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    // and loads more

    await Task.WhenAll(pingtime, processes, etc);
    return new ComputerInfo(pingTime.Result, processes.Result, etc);

    //var tcs = new TaskCompletionSource<ComputerInfo>();
    //Task.WhenAll(pingtime, processes, etc)
    //    .ContinueWith(aggregateTask =>
    //        if (aggregateTask.IsCompleted)
    //        {
    //            tcs.SetResult(new ComputerInfo(
    //                aggregateTask.Result[0],
    //                aggregateTask.Result[1],
    //                etc));
    //        }
    //        else
    //        {
    //            // cancel or error handling
    //        });

    // return the awaitable task
    //return tcs.Task;
}

【讨论】:

  • 为什么GetComputerInfo 使用延续和tcs 而btn_Click 是异步的? GetComputerInfo 应该返回 Task&lt;ComputerInfo&gt;,顺便说一句。
  • 这是一个使用 TaskCompletionSource 的异步包装器,请参见链接的 MSDN 文章中的图 9。签名已修复,谢谢。
  • Task.Run 是异步包装器,但如果其余代码使用 async/awaitContinueWith 是可憎的。 await Task.WhenAll(...); return new ComputerInfo(...); 会更容易写,更容易阅读和理解。
猜你喜欢
  • 1970-01-01
  • 2010-11-17
  • 1970-01-01
  • 2019-10-18
  • 1970-01-01
  • 1970-01-01
  • 2012-02-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多