【问题标题】:F# - Need help converting this to use a threadpoolF# - 需要帮助将其转换为使用线程池
【发布时间】:2012-04-06 05:48:02
【问题描述】:

我是 F# 的新手,我从网上找到的各种示例中对下面的代码进行了 frankensteined,试图更好地了解如何使用它。目前,下面的代码从文件中读取机器列表并 ping 每台机器。我必须将文件中的初始数组划分为一个由 25 台机器组成的较小数组,以控制并发操作的数量,否则绘制机器列表需要很长时间。我希望能够使用线程池来管理线程,但我还没有找到让它工作的方法。任何指导都会很棒。我无法完成这项工作:

let creatework  = FileLines|> Seq.map (fun elem -> ThreadPool.QueueUserWorkItem(new WaitCallback(dowork), elem))

完整代码如下:

open System.Threading
open System
open System.IO

let filePath = "c:\qa\machines.txt"

let FileLines = File.ReadAllLines(filePath)

let count = FileLines.Length/25

type ProcessResult = { exitCode : int; stdout : string; stderr : string } 

let executeProcess (exe,cmdline) = 
    let psi = new System.Diagnostics.ProcessStartInfo(exe,cmdline) 
    psi.UseShellExecute <- false
    psi.RedirectStandardOutput <- true 
    psi.RedirectStandardError <- true 
    psi.CreateNoWindow <- true
    let p = System.Diagnostics.Process.Start(psi, EnableRaisingEvents = true) 
    let output = new System.Text.StringBuilder()
    let error = new System.Text.StringBuilder() 
    p.OutputDataReceived.Add(fun args -> output.AppendLine(args.Data)|> ignore) 
    p.ErrorDataReceived.Add(fun args -> error.AppendLine(args.Data) |> ignore) 
    p.BeginErrorReadLine() 
    p.BeginOutputReadLine()
    p.WaitForExit()
    { exitCode = p.ExitCode; stdout = output.ToString(); stderr = error.ToString() } 

let dowork machinename=
    async{
        let exeout = executeProcess(@"c:\windows\system32\ping.exe", "-n 1 " + machinename)
        let exelines = 
            if exeout.stdout.Contains("Reply from") then Console.WriteLine(machinename + " " + "REPLY")
            elif exeout.stdout.Contains("Request timed out.") then Console.WriteLine(machinename + " " + "RTO")
            elif exeout.stdout.Contains("Ping request could not find host") then Console.WriteLine(machinename + " " + "Unknown Host")
            else Console.WriteLine(machinename + " " + "ERROR")
        exelines
        }

printfn "%A" (System.DateTime.Now.ToString())

for i in 0..count do
    let x = i*25
    let y = if i = count then FileLines.Length-1 else (i+1)*25
    printfn "%s %d" "X equals: " x
    printfn "%s %d" "Y equals: " y
    let filesection = FileLines.[x..y]
    let creatework = filesection |> Seq.map dowork |> Async.Parallel |> Async.RunSynchronously|>ignore
    creatework

printfn "%A" (System.DateTime.Now.ToString())
printfn "finished"

更新: 下面的代码有效,并为我想要做的事情提供了一个框架。 Tomas Petricek 引用的链接确实包含使这项工作的代码位。我只需要弄清楚哪个例子是正确的。它在用 Java 编写的重复框架的 3 秒内,所以我认为我正朝着正确的方向前进。我希望下面的示例对尝试在 F# 中线程化各种可执行文件的其他人有用:

open System
open System.IO
open System.Diagnostics

let filePath = "c:\qa\machines.txt"

let FileLines = File.ReadAllLines(filePath)

type Process with
    static member AsyncStart psi =
        let proc = new Process(StartInfo = psi, EnableRaisingEvents = true)
        let asyncExit = Async.AwaitEvent proc.Exited
        async {
            proc.Start() |> ignore
            let! args = asyncExit
            return proc
        } 

let shellExecute(program : string, args : string) =
    let startInfo =
        new ProcessStartInfo(FileName = program, Arguments = args,
            UseShellExecute = false,
            CreateNoWindow = true,
            RedirectStandardError = true,
            RedirectStandardOutput = true)
    Process.AsyncStart(startInfo)

let dowork (machinename : string)=
    async{
        let nonbtstat = "NONE"
        use! pingout = shellExecute(@"c:\windows\system32\ping.exe", "-n 1 " + machinename)
        let pingRdToEnd = pingout.StandardOutput.ReadToEnd()
        let pingresults =
            if pingRdToEnd.ToString().Contains("Reply from") then (machinename + " " + "REPLY")
            elif pingRdToEnd.ToString().Contains("Request timed out.") then (machinename + " " + "RTO")
            elif pingRdToEnd.ToString().Contains("Ping request could not find host") then (machinename + " " + "Unknown Host")
            else (machinename + " " + "PING_ERROR")
        if pingresults.ToString().Contains("REPLY") then
            use! nbtstatout = shellExecute(@"c:\windows\system32\nbtstat.exe", "-a " + machinename)
            let nbtstatRdToEnd = nbtstatout.StandardOutput.ReadToEnd().Split('\n')
            let nbtstatline = Array.tryFind(fun elem -> elem.ToString().Contains("<00>  UNIQUE      Registered")) nbtstatRdToEnd
            return Console.WriteLine(pingresults + nbtstatline.Value.ToString())
        else return Console.WriteLine(pingresults + " " + nonbtstat)
        }

printfn "%A" (System.DateTime.Now.ToString())

let creatework = FileLines |> Seq.map dowork |> Async.Parallel |> Async.RunSynchronously|>ignore
creatework

printfn "%A" (System.DateTime.Now.ToString())
printfn "finished" 

【问题讨论】:

  • 您是否考虑过使用Ping Class 代替ping.exe?通过一些工作,您可以将 SendAsync 方法和 PingCompleted 事件集成到 F# 异步工作流中......
  • 我之所以没有这样做是因为运行 ping 只是为了测试。我对 F# 的兴趣在于能够通过并行运行线程来利用多个 CPU。我工作的环境使用超过 15 年的无法更新的可执行文件,我想找到一种方法来利用多个内核,同时仍然保持向后兼容。

标签: asynchronous f# threadpool


【解决方案1】:

您的代码的主要问题是executeProcess 是一个需要很长时间才能运行的同步函数(它运行ping.exe 进程并等待其结果)。一般规则是线程池中的任务不应该长时间阻塞(因为那样它们会阻塞线程池线程,这意味着线程池不能有效地调度其他工作)。

我认为您可以通过使executeProcess 异步来轻松解决这个问题。您可以使用Async.AwaitEvent 等待Exitted 事件,而不是调用WaitForExit(阻塞):

let executeProcess (exe,cmdline) = async {
    let psi = new System.Diagnostics.ProcessStartInfo(exe,cmdline)  
    psi.UseShellExecute <- false 
    // [Lots of stuff omitted]
    p.BeginOutputReadLine() 
    let! _ = Async.AwaitEvent p.Exited
    return { exitCode = p.ExitCode
             stdout = output.ToString(); stderr = error.ToString() } }

这应该会解除线程池中的线程阻塞,因此您可以在输入数组中的所有 URL 上使用 Async.Parallel,而无需任何手动调度。

编辑 正如@desco 在评论中指出的那样,如果进程在到达AwaitEvent 行之前退出(在它可能错过事件之前),则上述内容并不完全正确。要解决此问题,您需要使用Event.guard 函数,该问题已在此 SO 问题中讨论:

【讨论】:

  • Async.AwaitEvent p.Exited 如果进程很快退出(基本上在调用 AwaitEvent 之前)可能会永远挂起
  • 我进行了关于更改执行功能的建议更改,但返回似乎现在无法捕获命令的结果。转换为字符串的“exeout”的搜索似乎没有返回。它还运行得如此之快,我不相信它现在正在运行 ping.exe。
  • @desco - 之前在这里讨论过,但我找不到上一个问题。感谢您提醒我 - 我添加了一个指向答案的链接,它应该解释如何解决这种情况。
  • @gear_9 我认为您需要设置 EnableRaisingEvents 属性(请参阅我刚刚添加到答案中的相关 SO 问题)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-11-24
  • 2019-12-23
  • 2021-11-06
  • 2016-12-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多