【问题标题】:Parallel.Foreach spawning way too many threadsParallel.Foreach 产生太多线程
【发布时间】:2010-01-04 22:51:42
【问题描述】:

问题

虽然我将在这里讨论的代码是用 F# 编写的,但它基于 .NET 4 框架,并不具体取决于 F# 的任何特殊性(至少看起来如此!)。

我的磁盘上有一些数据,我应该从网络更新,将最新版本保存到磁盘:

type MyData =
    { field1 : int;
      field2 : float }

type MyDataGroup =
    { Data : MyData[];
      Id : int }

// load : int -> MyDataGroup
let load dataId =
    let data = ... // reads from disk
    { Data = data;
      Id = dataId }

// update : MyDataGroup -> MyDataGroup
let update dg =
    let newData = ... // reads from the network and process
                      // newData : MyData[]

    { dg with Data = dg.Data
                     |> Seq.ofArray
                     |> Seq.append newData
                     |> processDataSomehow
                     |> Seq.toArray }

// save : MyDataGroup -> unit
let save dg = ... // writes to the disk

let loadAndSaveAndUpdate = load >> update >> save

问题是loadAndSaveAndUpdate我所有的数据,我必须执行函数很多次:

{1 .. 5000} |> loadAndSaveAndUpdate

每一步都可以

  • 一些磁盘 IO,
  • 一些数据处理,
  • 一些网络 IO(可能存在大量延迟),
  • 更多数据处理,
  • 和一些磁盘 IO。

在某种程度上并行完成这不是很好吗?不幸的是,我的阅读和解析功能都不是“async-workflows-ready”。

我提出的第一个(不是很好)解决方案

任务

我做的第一件事是设置Task[] 并启动它们:

let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
            |> Seq.map createTask
            |> Seq.toArray

tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)

然后我按 CTRL+ESC 只是为了查看它使用了多少线程。 15, 17, ..., 35, ..., 170, ... 直到杀死应用程序!出了点问题。

平行

我做了几乎相同的事情,但使用了Parallel.ForEach(...),结果是一样的:很多很多很多线程。

一种可行的解决方案……有点

然后我决定只启动n线程、Task.WaitAll(of them),然后是其他n,直到没有更多可用任务为止。

这行得通,但问题是当它完成处理后,比如说,n-1 任务,它会等待,等待,等待由于大量网络延迟而坚持阻塞的最后一个该死的任务。这不好!

那么,您将如何解决这个问题?我很乐意查看不同的解决方案,包括异步工作流(以及在这种情况下如何调整我的非异步函数)、并行扩展、奇怪的并行模式等。

谢谢。

【问题讨论】:

标签: .net f# parallel-processing task-parallel-library parallel-extensions


【解决方案1】:

ParallelOptions.MaxDegreeOfParallelism 限制并行方法调用运行的并发操作数

【讨论】:

  • 这个设置对我的应用程序没有任何影响。此外,MSDN 没有明确说明“最大并行度”是什么意思。
【解决方案2】:

使用“异步”将使您能够在各种 I/O 调用处于“海上”状态时执行 I/O 密集型工作而不会烧毁线程,因此这是我的第一个建议。将代码转换为异步应该很简单,通常按照以下方式进行

  • 将每个函数体包裹在async{...}中,必要时添加return
  • 通过Async.FromBeginEnd创建库中尚未包含的任何 I/O 原语的异步版本
  • let r = Foo() 形式的调用切换为let! r = AsyncFoo()
  • 使用 Async.Parallel 将 5000 个异步对象转换为并行运行的单个异步

有各种教程可以做到这一点;一个这样的网络广播是here

【讨论】:

  • 布莱恩,精彩的网络直播。当我买了一本 MEAP 的《真实世界函数式编程》,一本很棒的书时,我就认识了 Petricek。这个网络广播同样好!谢谢!
【解决方案3】:

您确定您的个人任务按时完成了吗?我相信Parallel.ForEachTask 类都已经使用了.NET 线程池。任务通常应该是短期工作项,在这种情况下线程池只会产生少量实际线程,但如果您的任务没有取得进展并且还有其他任务排队,那么使用的线程数将稳步增加到最大值(在 .NET 2.0 SP1 中默认为 250/processor,但在不同版本的框架下有所不同)。还值得注意的是(至少在 .NET 2.0 SP1 中)新线程创建被限制为每秒 2 个新线程,因此达到您所看到的线程数表明任务没有在短时间内完成时间(因此将责任归咎于Parallel.ForEach 可能并不完全准确)。

我认为 Brian 建议使用 async 工作流是一个很好的建议,特别是如果长期任务的来源是 IO,因为async 会将您的线程返回到线程池,直到 IO 完成。另一种选择是简单地接受您的任务没有快速完成并允许产生许多线程(可以通过使用System.Threading.ThreadPool.SetMaxThreads 在一定程度上控制) - 根据您的情况,您可能没什么大不了的重新使用了很多线程。

【讨论】:

  • 太棒了!很好,这就是我要找的。是的,我的线程大约在 1 new thread per second(不是 2——.NET 4 beta 2)产生,并且它们在 WebRequests 上阻塞。我找了SetMaxThreads 之类的东西,但找不到,谢谢!最后,我没有“接受”这么多线程,因为应用程序崩溃了,我坚信这是由于线程数量造成的(从现在开始,唯一的区别 async 它起作用了)跨度>
【解决方案4】:

您始终可以使用ThreadPool

http://msdn.microsoft.com/en-us/library/system.threading.threadpool.aspx

基本上:

  1. 创建线程池
  2. 设置最大线程数
  3. 使用QueueUserWorkItem(WaitCallback)对所有任务进行排队

【讨论】:

    猜你喜欢
    • 2020-12-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-11-08
    • 1970-01-01
    相关资源
    最近更新 更多