【发布时间】:2010-01-04 22:51:42
【问题描述】:
问题
虽然我将在这里讨论的代码是用 F# 编写的,但它基于 .NET 4 框架,并不具体取决于 F# 的任何特殊性(至少看起来如此!)。
我的磁盘上有一些数据,我应该从网络更新,将最新版本保存到磁盘:
type MyData =
{ field1 : int;
field2 : float }
type MyDataGroup =
{ Data : MyData[];
Id : int }
// load : int -> MyDataGroup
let load dataId =
let data = ... // reads from disk
{ Data = data;
Id = dataId }
// update : MyDataGroup -> MyDataGroup
let update dg =
let newData = ... // reads from the network and process
// newData : MyData[]
{ dg with Data = dg.Data
|> Seq.ofArray
|> Seq.append newData
|> processDataSomehow
|> Seq.toArray }
// save : MyDataGroup -> unit
let save dg = ... // writes to the disk
let loadAndSaveAndUpdate = load >> update >> save
问题是loadAndSaveAndUpdate我所有的数据,我必须执行函数很多次:
{1 .. 5000} |> loadAndSaveAndUpdate
每一步都可以
- 一些磁盘 IO,
- 一些数据处理,
- 一些网络 IO(可能存在大量延迟),
- 更多数据处理,
- 和一些磁盘 IO。
在某种程度上并行完成这不是很好吗?不幸的是,我的阅读和解析功能都不是“async-workflows-ready”。
我提出的第一个(不是很好)解决方案
任务
我做的第一件事是设置Task[] 并启动它们:
let createTask id = new Task(fun _ -> loadAndUpdateAndSave id)
let tasks = {1 .. 5000}
|> Seq.map createTask
|> Seq.toArray
tasks |> Array.iter (fun x -> x.Start())
Task.WaitAll(tasks)
然后我按 CTRL+ESC 只是为了查看它使用了多少线程。 15, 17, ..., 35, ..., 170, ... 直到杀死应用程序!出了点问题。
平行
我做了几乎相同的事情,但使用了Parallel.ForEach(...),结果是一样的:很多很多很多线程。
一种可行的解决方案……有点
然后我决定只启动n线程、Task.WaitAll(of them),然后是其他n,直到没有更多可用任务为止。
这行得通,但问题是当它完成处理后,比如说,n-1 任务,它会等待,等待,等待由于大量网络延迟而坚持阻塞的最后一个该死的任务。这不好!
那么,您将如何解决这个问题?我很乐意查看不同的解决方案,包括异步工作流(以及在这种情况下如何调整我的非异步函数)、并行扩展、奇怪的并行模式等。
谢谢。
【问题讨论】:
-
你问这个问题很久了,我才看到你的问题。这是我做的类似的事情:stackoverflow.com/a/6339923
标签: .net f# parallel-processing task-parallel-library parallel-extensions