【发布时间】:2013-03-05 14:54:10
【问题描述】:
我有一些计算密集型任务,现在只在 1 个内核上运行,因此是我机器容量的 1/8。在每个任务结束时,我都会在文件中写入日志。
使用并行任务处理此 IO 的最优雅方式是什么?
让我的写入本身是异步的?
向按顺序处理写入的代理发送消息?
[<Fact>]
let boom () =
let tasks = [1 .. 10]
|> Seq.map (fun components -> async { //do compute intensive stuff
use writer = new StreamWriter("samefile")
writer.WriteLine "toto" }
)
tasks |> Async.Parallel |> Async.RunSynchronously
编辑
我最终这样做了,并通过对代理的同步调用将异步中的 new Stream 替换为代码。
[<Fact>]
let pasBoom () =
let tasks = [2 .. 2 .. 17]
|> Seq.map (fun components -> async { //do compute intensive stuff
//use writer = new StreamWriter("samefile")
use writerhanlde = repoFileHandle.PostAndReply(fun replyChannel -> GetFile(@"samefile", replyChannel))
printfn "%A" (writerhanlde.getWriter().ToString())
writerhanlde.getWriter().WriteLine "toto" }
)
tasks |> Async.Parallel |> Async.RunSynchronously
和代理(可能有bug请小心,我自己需要一些快速的东西)
type IDisposableWriter =
inherit IDisposable
abstract getWriter : unit -> StreamWriter
type StreamMessage = | GetFile of string * AsyncReplyChannel<IDisposableWriter>
let repoFileHandle =
let writerCount = new Dictionary<string, int>()
let writerRepo = new Dictionary<string, StreamWriter> ()
Agent.Start(fun inbox ->
async { while true do
let! msg = inbox.Receive()
match msg with
| GetFile(filename, reply) ->
if not (writerRepo.ContainsKey(filename)) then
writerRepo.[filename] <- new StreamWriter(filename,true)
writerCount.[filename] <- 0
writerCount.[filename] <- writerCount.[filename] + 1
let obj = {new IDisposableWriter with
member this.getWriter () = writerRepo.[filename]
member IDisposable.Dispose() =
writerCount.[filename] <- writerCount.[filename] - 1
if writerCount.[filename] = 0 then
writerRepo.[filename].Dispose()
writerRepo.Remove(filename) |> ignore
}
reply.Reply(obj) })
并避免并发写入
type WriteToStreamMessage = | WriteToStream of string * string
let fileWriterAgent =
Agent.Start(fun inbox ->
async { while true do
let! msg = inbox.Receive()
match msg with
| WriteToStream(filename, content) ->
use writerhanlde = repoFileHandle.PostAndReply(fun replyChannel -> GetFile(filename, replyChannel))
writerhanlde.getWriter().WriteLine content
})
【问题讨论】:
-
我的措辞模棱两可。我的意思是异步,它只会组成延续。我对TPL了解不多
-
async 不会也有助于在不同的内核上分发吗?
-
你能包含你的代码吗?
-
在这里。 IO 资源存在一些竞争条件。我想代理会是一个答案,但我不记得所有可用的选项。
标签: asynchronous f# io