【问题标题】:Need help regarding Async and fsi需要有关异步和 fsi 的帮助
【发布时间】:2011-02-08 14:25:29
【问题描述】:

我想编写一些代码来运行一系列 F# 脚本 (.fsx)。问题是我可以拥有数百个脚本,如果我这样做:

let shellExecute program args =
    let startInfo = new ProcessStartInfo()
    do startInfo.FileName        <- program
    do startInfo.Arguments       <- args
    do startInfo.UseShellExecute <- true
    do startInfo.WindowStyle     <- ProcessWindowStyle.Hidden

    //do printfn "%s" startInfo.Arguments 
    let proc = Process.Start(startInfo)
    ()

scripts
|> Seq.iter (shellExecute "fsi")

可能对我的 2GB 系统造成太大压力。无论如何,我想按n批次运行脚本,这似乎也是学习Async的一个很好的练习(我想这是要走的路)。

我已经开始为此编写一些代码,但不幸的是它不起作用:

open System.Diagnostics

let p = shellExecute "fsi" @"C:\Users\Stringer\foo.fsx"

async {
    let! exit = Async.AwaitEvent p.Exited
    do printfn "process has exited"
}
|> Async.StartImmediate

foo.fsx 只是一个 hello world 脚本。 解决这个问题最惯用的方法是什么?

我还想弄清楚是否可以为每个正在执行的脚本检索返回码,如果不可行,请找到另一种方法。谢谢!

编辑:

非常感谢您的见解和链接!我学到了很多东西。 我只想按照 Tomas 的建议添加一些代码以使用 Async.Parallel 并行运行批处理。如果我的cut 函数有更好的实现,请发表评论。

module Seq =
  /// Returns a sequence of sequences of N elements from the source sequence.
  /// If the length of the source sequence is not a multiple
  /// of N, last element of the returned sequence will have a length
  /// included between 1 and N-1.
  let cut (count : int) (source : seq<´T>) = 
    let rec aux s length = seq {
      if (length < count) then yield s
      else
        yield Seq.take count s
        if (length <> count) then
          yield! aux (Seq.skip count s) (length - count)
      }
    aux source (Seq.length source)

let batchCount = 2
let filesPerBatch =
  let q = (scripts.Length / batchCount)
  q + if scripts.Length % batchCount = 0 then 0 else 1

let batchs =
  scripts
  |> Seq.cut filesPerBatch
  |> Seq.map Seq.toList
  |> Seq.map loop

Async.RunSynchronously (Async.Parallel batchs) |> ignore

EDIT2:

所以我在让 Tomas 的保护代码工作时遇到了一些麻烦。我猜f 函数必须在AddHandler 方法中调用,否则我们将永远失去事件......这是代码:

module Event =
  let guard f (e:IEvent<´Del, ´Args>) = 
    let e = Event.map id e
    { new IEvent<´Args> with 
        member this.AddHandler(d) = e.AddHandler(d); f() //must call f here!
        member this.RemoveHandler(d) = e.RemoveHandler(d); f()
        member this.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

有趣的事情(正如 Tomas 所提到的)是,看起来Exited 事件在进程终止时存储在某处,即使该进程尚未启动,EnableRaisingEvents 设置为 true。 当此属性最终设置为 true 时,将触发该事件。

由于我不确定这是不是官方规范(也有点偏执),我找到了另一种解决方案,即在 guard 函数中启动进程,因此我们确保代码适用于任何一个情况:

let createStartInfo program args =
  new ProcessStartInfo
    (FileName = program, Arguments = args, UseShellExecute = false,
     WindowStyle = ProcessWindowStyle.Normal, 
     RedirectStandardOutput = true)

let createProcess info =
  let p = new Process()
  do p.StartInfo           <- info
  do p.EnableRaisingEvents <- true
  p

let rec loop scripts = async { 
  match scripts with 
  | [] -> printfn "FINISHED"
  | script::scripts ->
    let args = sprintf "\"%s\"" script
    let p = createStartInfo "notepad" args |> createProcess
    let! exit =
      p.Exited 
      |> Event.guard (fun () -> p.Start() |> ignore)
      |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    do printfn "\nPROCESSED: %s, CODE: %d, OUTPUT: %A"script p.ExitCode output
    return! loop scripts 
  }

请注意,我已将 fsi.exe 替换为 notepad.exe,因此我可以在调试器中逐步重播不同的场景,并自己明确控制进程的退出.

【问题讨论】:

  • 您应该从这一行中删除 f():member this.RemoveHandler(d) = e.RemoveHandler(d); f() 否则 p.Start() 将在事件处理程序被删除时被调用。

标签: f# sequence asynchronous f#-interactive


【解决方案1】:

您的方法对我来说看起来很棒,我真的很喜欢使用 AwaitEvent 将流程执行嵌入到异步工作流中的想法!

它不起作用的可能原因是您需要将ProcessEnableRisingEvents 属性设置为true,如果您希望它触发Exited 事件(不要问我为什么你必须这样做,这对我来说听起来很傻!)无论如何,我在测试时对你的代码做了一些其他更改,所以这里有一个对我有用的版本:

open System
open System.Diagnostics

let shellExecute program args = 
  // Configure process to redirect output (so that we can read it)
  let startInfo = 
    new ProcessStartInfo
      (FileName = program, Arguments = args, UseShellExecute = false,
       WindowStyle = ProcessWindowStyle.Hidden, 
       RedirectStandardOutput = true)

  // Start the process
  // Note: We must enable rising events explicitly here!
  Process.Start(startInfo, EnableRaisingEvents = true)

最重要的是,代码现在将EnableRaisingEvents 设置为true。我还更改了代码以使用在构造对象时指定对象属性的语法(使代码更简洁),并更改了一些属性,以便我可以读取输出(RedirectStandardOutput)。

现在,我们可以使用AwaitEvent 方法等待进程完成。我假设fsi 包含fsi.exe 的路径,而scripts 是FSX 脚本的列表。如果您想按顺序运行它们,可以使用使用递归实现的循环:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    // Start the proces in background
    let p = shellExecute fsi script 
    // Wait until the process completes
    let! exit = Async.AwaitEvent p.Exited 
    // Read the output produced by the process, the exit code
    // is available in the `ExitCode` property of `Process`
    let output = p.StandardOutput.ReadToEnd()
    printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output
    // Process the rest of the scripts
    return! loop scripts  } 

// This starts the workflow on background thread, so that we can
// do other things in the meantime. You need to add `ReadLine`, so that
// the console application doesn't quit immedeiately
loop scripts |> Async.Start
Console.ReadLine() |> ignore    

当然,您也可以并行运行这些进程(或者例如并行运行 2 组进程等)。为此,您可以使用 Async.Parallel(以通常的方式)。

无论如何,这是一个非常好的例子,它在我到目前为止还没有看到它们使用过的领域中使用异步工作流。非常有趣:-)

【讨论】:

  • 不错的答案。在调用 Async.AwaitEvent 之前进程是否存在退出的危险(这意味着在添加侦听器后不会引发事件)?
  • 是的,这是一场比赛,参见例如v2matveev.blogspot.com/2010/02/…
  • 好点...可悲的是,这让事情变得更加复杂:-(。应该有一些方法可以在开始该过程之前创建事件 - 但无论我们做什么,我们都可能会失去如果在后台线程上触发事件 - 他们是否至少保证它不会在我们设置 EnableRaisingEvents 之前完成?尽管如此,我们仍然需要像 Concurrent ML 中的 guard 组合器这样的东西(参见例如citeseerx.ist.psu.edu/viewdoc/…)。我想这是同步单线程方法的+1。
  • 我的回答能解决比赛条件,还是我遗漏了什么?
  • @Joel:恐怕你的方法行不通(详情在你的帖子下方)
【解决方案2】:

邮箱处理器呢?

【讨论】:

  • 听起来不错,但我更喜欢首先坚持使用异步工作流,因为我认为在我的 F# 代码中它们可以比 MailboxProcessor 更广泛地使用。另外,在我见过的 MailboxProcessor F# 代码中经常有异步代码。
【解决方案3】:

针对 Tomas 的回答,这对于启动进程然后订阅其 Exited 事件所涉及的竞争条件是否是可行的解决方案?

type Process with
    static member AsyncStart psi =
        let proc = new Process(StartInfo = psi, EnableRaisingEvents = true)
        let asyncExit = Async.AwaitEvent proc.Exited
        async {
            proc.Start() |> ignore
            let! args = asyncExit
            return proc
        }

除非我弄错了,否则这将在开始进程之前订阅事件,并将其全部打包为 Async&lt;Process&gt; 结果。

这将允许您像这样重写其余代码:

let shellExecute program args = 
  // Configure process to redirect output (so that we can read it)
  let startInfo = 
    new ProcessStartInfo(FileName = program, Arguments = args, 
        UseShellExecute = false,
        WindowStyle = ProcessWindowStyle.Hidden, 
        RedirectStandardOutput = true)

  // Start the process
  Process.AsyncStart(startInfo)

let fsi = "PATH TO FSI.EXE"

let rec loop scripts = async { 
    match scripts with 
    | [] -> printf "FINISHED"
    | script::scripts ->
        // Start the proces in background
        use! p = shellExecute fsi script 
        // Read the output produced by the process, the exit code
        // is available in the `ExitCode` property of `Process`
        let output = p.StandardOutput.ReadToEnd()
        printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output
        // Process the rest of the scripts
        return! loop scripts 
} 

如果这样做了,那么需要担心的代码肯定比 Vladimir 的 Async.GetSubject 少得多。

【讨论】:

  • 我认为这行不通,但这种方法看起来很有希望。您在工作流之外运行的代码仅创建一个异步工作流 (asyncExit),一旦工作流启动 (let!),它将将处理程序附加到事件,因此其行为与我的原始版本相同。要修复它,您需要将处理程序附加到 proc.Exited 事件(在返回工作流之前)并在使用一些可变变量调用 AwaitEvent 之前处理事件触发的情况......我想我找到了另一种方法解决这个问题,所以我会发布它。
  • 哦,真令人失望。我希望获得基于 AwaitEvent 的异步工作流与订阅它是一样的。
【解决方案4】:

我做了一些实验,这是解决我帖子下方的 cmets 和 Joel 的回答中讨论的问题的一种方法(我认为目前不起作用,但可以修复)。

认为Process的规范是在我们将EnableRaisingEvents属性设置为true后,它可以触发Exited事件(并且即使进程被触发也会触发事件)在我们设置属性之前已经完成)。为了正确处理这种情况,我们需要在我们将处理程序附加到Exited 事件之后启用事件的引发。

这是一个问题,因为如果我们使用AwaitEvent,它将阻塞工作流,直到事件触发。在从工作流中调用AwaitEvent 之后,我们无法做任何事情(如果我们在调用AwaitEvent 之前设置了属性,那么我们就会进行比赛......)。 Vladimir's approach 是正确的,但我认为有一个更简单的方法来处理这个问题。

我将创建一个函数Event.guard 接受一个事件并返回一个事件,它允许我们指定将在处理程序附加到事件之后执行的某些函数。这意味着如果我们在这个函数内部做一些操作(进而触发事件),事件就会被处理。

要将它用于这里讨论的问题,我们需要将我原来的解决方案更改如下。首先,shellExecute 函数不能设置EnableRaisingEvents 属性(否则,我们可能会丢失事件!)。其次,等待代码应该是这样的:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    let p = shellExecute fsi script 
    let! exit = 
      p.Exited 
        |> Event.guard (fun () -> p.EnableRaisingEvents <- true)
        |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    return! loop scripts  } 

注意Event.guard 函数的使用。粗略地说,在工作流将处理程序附加到 p.Exited 事件后,提供的 lambda 函数将运行(并将启用事件的引发)。但是,我们已经将处理程序附加到事件,所以如果这会立即导致事件,我们很好!

实现(EventObservable)如下所示:

module Event =
  let guard f (e:IEvent<'Del, 'Args>) = 
    let e = Event.map id e
    { new IEvent<'Args> with 
        member x.AddHandler(d) = e.AddHandler(d)
        member x.RemoveHandler(d) = e.RemoveHandler(d); f()
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

module Observable =
  let guard f (e:IObservable<'Args>) = 
    { new IObservable<'Args> with 
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

好在这段代码非常简单。

【讨论】:

  • 非常好!您介意在e.Subscribe(observer) in f() 中描述in 关键字的重要性吗?看起来它与冗长的语法有关,但我不熟悉“函数调用中的函数调用”的用法。
  • 当然。在冗长的语法中,您总是在let 的初始化表达式之后写in(例如let a = 1 in printf "...")。如果您使用 light 语法,编译器 认为 如果您以换行符结束 let,它就在那里。如果你想在单行上写let,然后是其他东西(在我的例子中,为了使代码更简洁),你需要在let之后添加in(以及在正常表达式之后添加;) .
  • 小(但很重要)错字:f() 调用应该在 AddHandler() 方法上而不是 RemoveHandler() 上。
【解决方案5】:

可以从博文中简化主题版本。 getSubject 可以返回工作流,而不是返回模拟事件。

结果工作流本身是具有两种状态的状态机 1. 事件尚未触发:所有挂起的监听器都应该被注册 2. 值已经设置,监听器立即服务 在代码中它将如下所示:

type SubjectState<'T> = Listen of ('T -> unit) list | Value of 'T

getSubject 实现很简单

let getSubject (e : IEvent<_, _>) = 
    let state = ref (Listen [])
    let switchState v = 
        let listeners = 
            lock state (fun () ->
                match !state with
                | Listen ls -> 
                    state := Value v 
                    ls
                | _ -> failwith "Value is set twice"
            )
        for l in listeners do l v

    Async.StartWithContinuations(
        Async.AwaitEvent e,
        switchState,
        ignore,
        ignore
    )

Async.FromContinuations(fun (cont, _, _) ->
    let ok, v = lock state (fun () ->
        match !state with
        | Listen ls ->
            state := Listen (cont::ls)
            false, Unchecked.defaultof<_>
        | Value v ->
            true, v
        )
    if ok then cont v
    )

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-07-11
    • 1970-01-01
    • 2021-02-04
    • 1970-01-01
    • 2015-06-23
    • 2014-12-26
    • 2013-08-12
    • 2014-06-11
    相关资源
    最近更新 更多