【问题标题】:How to tell if an async computation is sent to the thread pool?如何判断异步计算是否发送到线程池?
【发布时间】:2021-06-11 20:24:24
【问题描述】:

我最近被告知在

async {
    return! async { return "hi" } }
|> Async.RunSynchronously
|> printfn "%s"

嵌套的Async<'T> (async { return 1 }) 不会被发送到线程池进行评估,而在

async {
    use ms = new MemoryStream [| 0x68uy; 0x69uy |]
    use sr = new StreamReader (ms)
    return! sr.ReadToEndAsync () |> Async.AwaitTask }
|> Async.RunSynchronously
|> printfn "%s"

嵌套的Async<'T> (sr.ReadToEndAsync () |> Async.AwaitTask) 将是。当let!return! 等异步操作执行时,Async<'T> 决定是否将其发送到线程池是什么?特别是,您将如何定义发送到线程池的线程池?您必须在 async 块中或在传递给 Async.FromContinuations 的 lambda 中包含哪些代码?

【问题讨论】:

  • ReadToEndAsync () 在任务池中排队一个 IO 任务。 Async 只是在捕捉它的延续。
  • @Asti 谢谢。 ReadToEndAsync 是如何做到的?有没有像QueueIOTaskOnTaskPool 这样的方法的类?

标签: asynchronous f# threadpool


【解决方案1】:

TL;DR: 不是这样的。 async 本身不会向线程池“发送”任何内容。它所做的只是继续运行直到它们停止。如果其中一个延续决定在新线程上继续 - 好吧,那就是线程切换发生的时候。


让我们建立一个小例子来说明会发生什么:

let log str = printfn $"{str}: thread = {Thread.CurrentThread.ManagedThreadId}"

let f = async {
  log "1"
  let! x = async { log "2"; return 42 }
  log "3"
  do! Async.Sleep(TimeSpan.FromSeconds(3.0))
  log "4"
}

log "starting"
f |> Async.StartImmediate
log "started"
Console.ReadLine()

如果你运行这个脚本,它将打印,starting,然后是123,然后是started,然后等待 3 秒,然后打印 4,以及所有除了4 之外,它们将具有相同的线程 ID。您可以看到直到Async.Sleep 之前的所有内容都在同一线程上同步执行,但之后async 执行停止并且主程序继续执行,打印started 然后阻塞ReadLine。当Async.Sleep 醒来并想继续执行时,原来的线程已经在ReadLine 上被阻塞了,所以异步计算可以继续在新的线程上运行。

这里发生了什么?这个功能如何?

首先,异步计算的结构在"continuation-passing style" 中。这是一种技术,每个函数都不会将其结果返回给调用者,而是调用另一个函数,并将结果作为参数传递。

让我用一个例子来说明:

// "Normal" style:
let f x = x + 5
let g x = x * 2
printfn "%d" (f (g 3)) // prints 11

// Continuation-passing style:
let f x next = next (x + 5)
let g x next = next (x * 2)
g 3 (fun res1 -> f res1 (fun res2 -> printfn "%d" res2))

这被称为“延续传递”,因为next 参数被称为“延续” - 即它们是表达程序在调用fg 之后如何继续 的函数.是的,这正是Async.FromContinuations 的意思。

表面上看起来非常愚蠢和迂回,这让我们可以做的是让每个函数决定何时、如何,甚至如果它的延续发生。例如,我们上面的 f 函数可能会做一些异步的事情,而不是简单地返回结果:

let f x next = httpPost "http://calculator.com/add5" x next

以连续传递风格对其进行编码将允许此类函数在对calculator.com 的请求进行时不阻塞当前线程。你问阻塞线程有什么问题?我会把你推荐给the original answer,它首先提示了你的问题。


第二,当你编写那些async { ... } 块时,编译器会给你一点帮助。它采用看起来像一步一步的命令式程序并将其“展开”成一系列连续传递调用。这种展开的“断裂”点是所有以爆炸结尾的结构 - let!do!return!

例如,上面的async 块看起来像这样(F#-ish 伪代码):

let return42 onDone = 
  log "2"
  onDone 42

let f onDone =
  log "1"
  return42 (fun x ->
    log "3"
    Async.Sleep (3 seconds) (fun () ->
      log "4"
      onDone ()
    )
  )

在这里,您可以清楚地看到return42 函数只是立即调用它的延续,从而使从log "1"log "3" 的整个事情完全同步,而Async.Sleep 函数没有调用它的延续马上,而是安排它稍后(在 3 秒内)在线程池上运行。这就是线程切换发生的地方。

最后,这里是您问题的答案:为了让async 计算跳转线程,您传递给Async.FromContinuations 的回调应该做任何事情,除了立即调用成功继续。


需要进一步调查的几点说明

  1. 上述示例中的onDone 技术在技术上称为"monadic bind",实际上在真正的F# 程序中它由async.Bind 方法表示。 This answer 也可能有助于理解这个概念。
  2. 以上内容有点过于简单化了。 In reality async 的执行比这要复杂一些。在内部,它使用了一种称为"trampoline" 的技术,简单来说,它只是一个循环,每轮都运行一个 thunk,但至关重要的是,正在运行的 thunk 也可以“要求”它运行另一个 thunk,如果确实如此,循环会这样做,以此类推,直到下一个 thunk 不再要求运行另一个 thunk,然后整个事情最终停止。
  3. 在我的示例中,我专门使用Async.StartImmediate 来启动计算,因为Async.StartImmediate 会按照它在锡上所说的那样做:它会立即开始运行计算,就在那里。这就是为什么一切都与主程序在同一个线程上运行的原因。有many alternative starting functions in the Async module。例如,Async.Start 将在线程池上开始计算。从log "1"log "3" 的行仍将全部同步发生,它们之间没有线程切换,但它将发生在与log "start"log "starting" 不同的线程上。在这种情况下,线程切换将在 async 计算甚至开始之前发生,因此不计算在内。

【讨论】:

  • 使线程切换发生的代码在哪里?在Async.Sleep?在AsyncBuilder?
  • 正如我在答案中解释的那样:“... Async.Sleep 函数不会立即调用它的延续,而是安排它稍后运行 ...” Here's the precise line 如果你坚持的话。
猜你喜欢
  • 2011-02-09
  • 1970-01-01
  • 2022-11-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-11-21
  • 2011-04-26
相关资源
最近更新 更多