【问题标题】:f# mailboxprocessor - replying without waiting for deliveryf#mailboxprocessor - 无需等待投递即可回复
【发布时间】:2017-06-26 09:42:35
【问题描述】:

我正在使用代理 (MailboxProcessor) 来执行一些需要响应的有状态处理。

  • 来电者使用MailboxProcessor.PostAndAsyncReply 发布消息
  • 在代理中,以AsyncReplyChannel.Reply 给出响应

但是,通过查看 f# 源代码,我发现代理在响应传递之前不会处理下一条消息。总的来说,这是一件好事。但在我的情况下,代理更希望继续处理消息而不是等待响应传递。

做这样的事情来传递响应是否有问题? (或者有更好的选择吗?)

async { replyChannel.Reply response } |> Async.Start

我知道这种方法不能保证响应会按顺序传递。我没关系。

参考示例

// agent code
let doWork data =
    async { ... ; return response }

let rec loop ( inbox : MailboxProcessor<_> ) =
    async {
        let! msg = inbox.Receive()
        match msg with
        | None ->
            return ()

        | Some ( data, replyChannel ) ->
            let! response = doWork data
            replyChannel.Reply response (* waits for delivery, vs below *)
            // async { replyChannel.Reply response } |> Async.Start
            return! loop inbox
    }

let agent =
    MailboxProcessor.Start(loop)

// caller code
async {
    let! response =
        agent.PostAndAsyncReply(fun replyChannel -> Some (data, replyChannel))
    ...
}

【问题讨论】:

  • 您尝试做的事情有点违背PostAndAsyncReply 的观点,那么为什么要使用它呢?让客户传入一个可观察的主题,然后将您的回复推送到其中。
  • 必须以任何一种方式(使用可观察或使用回复通道)支付同步成本以传递响应。问题中使用的方法没有添加额外的代码行,也没有引入新概念。但是,如果您可以确定其他权衡并提供一个示例,那将是一个很好的答案。
  • 您自己已经确定了权衡:回复渠道不能乱序工作。
  • 让我看看我是否正确理解了您的目标: 1. 您希望代理不要等待消息传递,以便在上一条消息在传输过程中继续处理消息 2. 您希望调用者同步处理消息?
  • @mydogisbox 1 是的。 2 否。调用者也在异步操作中,当提供响应时它会恢复。上一条评论中提到的“同步成本”与在后台协调线程的成本有关。 IE。 f# AsyncReplyChannel 在内部使用 ManualResetEvent 来表示结果准备就绪。所以当Reply 被调用时,它不会返回,直到响应被传递。这个有效的让代理等到调用者收到响应后再处理下一条消息。

标签: f# agent mailboxprocessor


【解决方案1】:

FSharp.Control.AsyncSeq 在邮箱处理器上添加了一张更友好的面孔。异步序列更容易遵循,但是默认实现映射并行具有与描述相同的问题,等待序列中的前一个元素被映射以保留顺序。

所以我创建了一个新函数,它只是原来的 AsyncSeq.mapAsyncParallel,经过修改使其不再是真正的映射,因为它是无序的,但它确实映射了所有内容,并且惰性 seq 会随着工作的完成而进展。

Full Source for AsyncSeq.mapAsyncParallelUnordered

let mapAsyncParallelUnordered (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
  use mb = MailboxProcessor.Start (fun _ -> async.Return())
  let! err =
    s 
    |> AsyncSeq.iterAsyncParallel (fun a -> async {
      let! b = f a
      mb.Post (Some b) })
    |> Async.map (fun _ -> mb.Post None)
    |> Async.StartChildAsTask
  yield! 
    AsyncSeq.replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
  }

下面是我如何在一个工具中使用它的示例,该工具使用免费的 SSLlabs 和非常慢的 api,很容易过载。 parallelProcessHost 返回一个懒惰的AsyncSeq,它是由 webapi 请求生成的,所以AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync 实际运行请求并允许控制台在进来时打印出结果,与发送的顺序无关。

Full Source

let! es = 
    hosts
    |> Seq.indexed
    |> AsyncSeq.ofSeq
    |> AsyncSeq.map parallelProcessHost
    |> AsyncSeq.mapAsyncParallelUnordered AsyncSeq.toListAsync
    |> AsyncSeq.indexed
    |> AsyncSeq.map (fun (i, tail) -> (consoleN "-- %d of %i --- %O --" (i+1L) totalHosts (DateTime.UtcNow - startTime)) :: tail )
    |> AsyncSeq.collect AsyncSeq.ofSeq
    |> AsyncSeq.map stdoutOrStatus //Write out to console
    |> AsyncSeq.fold (|||) ErrorStatus.Okay

【讨论】:

    猜你喜欢
    • 2017-12-15
    • 1970-01-01
    • 2013-05-01
    • 1970-01-01
    • 2014-11-04
    • 2015-02-19
    • 2015-08-25
    • 2012-07-12
    • 1970-01-01
    相关资源
    最近更新 更多