【问题标题】:IO and parallel async in FsharpFsharp 中的 IO 和并行异步
【发布时间】:2013-03-05 14:54:10
【问题描述】:

我有一些计算密集型任务,现在只在 1 个内核上运行,因此是我机器容量的 1/8。在每个任务结束时,我都会在文件中写入日志。

使用并行任务处理此 IO 的最优雅方式是什么?

让我的写入本身是异步的?

向按顺序处理写入的代理发送消息?

[<Fact>]
let boom () = 
    let tasks = [1 .. 10]
                |> Seq.map (fun components ->  async {  //do compute intensive stuff
                                                        use writer = new StreamWriter("samefile")
                                                        writer.WriteLine "toto" }
           )
    tasks |> Async.Parallel  |> Async.RunSynchronously

编辑

我最终这样做了,并通过对代理的同步调用将异步中的 new Stream 替换为代码。

[<Fact>]
let pasBoom () = 
    let tasks = [2 .. 2 .. 17]
                |> Seq.map (fun components ->  async {  //do compute intensive stuff
                                                        //use writer = new StreamWriter("samefile")
                                                        use writerhanlde = repoFileHandle.PostAndReply(fun replyChannel -> GetFile(@"samefile", replyChannel))  
                                                        printfn "%A" (writerhanlde.getWriter().ToString())
                                                        writerhanlde.getWriter().WriteLine "toto" }
           )
    tasks |> Async.Parallel  |> Async.RunSynchronously

和代理(可能有bug请小心,我自己需要一些快速的东西)

type IDisposableWriter = 
    inherit IDisposable
    abstract getWriter : unit -> StreamWriter


type StreamMessage = | GetFile of string * AsyncReplyChannel<IDisposableWriter>

let repoFileHandle =
    let writerCount = new Dictionary<string, int>()
    let writerRepo  = new Dictionary<string, StreamWriter> ()

    Agent.Start(fun inbox ->
        async { while true do
                    let! msg = inbox.Receive()
                    match msg with
                    | GetFile(filename, reply) -> 
                        if not (writerRepo.ContainsKey(filename)) then
                            writerRepo.[filename]  <- new StreamWriter(filename,true)
                            writerCount.[filename] <- 0
                        writerCount.[filename] <- writerCount.[filename] + 1

                        let obj = {new IDisposableWriter with 
                                    member this.getWriter () = writerRepo.[filename] 
                                    member IDisposable.Dispose() =  
                                        writerCount.[filename] <- writerCount.[filename] - 1                                                                        
                                        if writerCount.[filename] = 0 then
                                            writerRepo.[filename].Dispose()
                                            writerRepo.Remove(filename) |> ignore
                                }
                        reply.Reply(obj) })

并避免并发写入

  type WriteToStreamMessage = | WriteToStream of string * string

  let fileWriterAgent =
        Agent.Start(fun inbox ->
            async { while true do
                        let! msg = inbox.Receive()
                        match msg with
                        | WriteToStream(filename, content) -> 
                            use writerhanlde = repoFileHandle.PostAndReply(fun replyChannel -> GetFile(filename, replyChannel))
                            writerhanlde.getWriter().WriteLine content
    })

【问题讨论】:

  • 我的措辞模棱两可。我的意思是异步,它只会组成延续。我对TPL了解不多
  • async 不会也有助于在不同的内核上分发吗?
  • 你能包含你的代码吗?
  • 在这里。 IO 资源存在一些竞争条件。我想代理会是一个答案,但我不记得所有可用的选项。

标签: asynchronous f# io


【解决方案1】:

您能否更改计算以返回要记录的消息而不是将其写入文件?然后你可以在 PowerPack 中使用PSeq,它是 TPL 上的一个瘦包装器:

open Microsoft.FSharp.Collections

let work n = sprintf "running task %d" n
let msgs = PSeq.init 10 work |> PSeq.toList
use writer = System.IO.StreamWriter(@"C:\out.log")
msgs |> List.iter writer.WriteLine

【讨论】:

  • 不幸的是,我有很多地方可以写在不同的文件中,所以我正在寻找一种通用的云纹方式。谢谢你的想法
  • IO 未同步,因此您可以将其卸载到“排队 IO 代理”,以便对写入进行序列化。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2010-10-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-03-21
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多