【问题标题】:Select First Async Result选择第一个异步结果
【发布时间】:2016-12-31 21:48:26
【问题描述】:

是否有异步操作符来获取两个异步值首先返回的值(Async<_>)?

例如,给定两个 Async<_> 值,其中一个 A1 在 1 秒后返回,A2 在 2 秒后返回,那么我想要 A1 的结果。

原因是我想为异步序列实现一个交错函数,这样如果有两个异步序列像这样“定义”(空格表示时间,就像大理石图一样):

S1 = -+-----+------------+----+
S2 = ---+-------+----------+-----+

然后我想生成一个新的异步序列,行为如下:

S3 = -+-+---+---+--------+-+--+--+

交错 S1 S2 = S3

但是两个这样做,我可能需要一种异步选择运算符来选择选择值。

我认为这就像 Go 中的“选择”,您可以从两个通道中获取第一个可用值。

TPL 有一个名为 Task.WhenAny 的函数——我可能在这里需要类似的东西。

【问题讨论】:

  • 我不确定您实际上是否可以非常轻松地从异步选择构建异步交错(因为您所描述的选择必须运行 A1 和 A2 并丢弃 A2,而您想使用它)。你的异步序列是什么类型的?
  • 您能否将消费者实现为MailboxProcessor,并让 S1 和 S2 都向其发布消息? MailboxProcessor 将按 S3 顺序获取消息。
  • 类型在这里定义:tomasp.net/blog/async-sequences.aspx。你是对的,我不想丢弃其他计算,我只想按照完成的顺序使用它们的结果。
  • @JohnReynolds 可能。大概。不过,我对开销尽可能低的解决方案感兴趣。我也许可以在操作中隐藏代理,并且仍然在它们到达时异步提取值。我希望操作返回与其参数相同类型的值,以实现组合设计。

标签: asynchronous f# task-parallel-library sequence


【解决方案1】:

我认为该运算符在 F# 库中不可用。要结合现有操作,您可以使用Async.StartAsTask,然后使用现有的Task.WhenAny 运算符。但是,我不确定在取消方面会有什么表现。

另一个选项是使用在F# Snippets web site 上实现的Async.Choose 运算符。这不是特别优雅,但它应该可以解决问题!为了使答案独立,代码附在下面。

/// Creates an asynchronous workflow that non-deterministically returns the 
/// result of one of the two specified workflows (the one that completes
/// first). This is similar to Task.WaitAny.
static member Choose(a, b) : Async<'T> = 
    Async.FromContinuations(fun (cont, econt, ccont) ->
      // Results from the two 
      let result1 = ref (Choice1Of3())
      let result2 = ref (Choice1Of3())
      let handled = ref false
      let lockObj = new obj()
      let synchronized f = lock lockObj f

      // Called when one of the workflows completes
      let complete () = 
        let op =
          synchronized (fun () ->
            // If we already handled result (and called continuation)
            // then ignore. Otherwise, if the computation succeeds, then
            // run the continuation and mark state as handled.
            // Only throw if both workflows failed.
            match !handled, !result1, !result2 with 
            | true, _, _ -> ignore
            | false, (Choice2Of3 value), _ 
            | false, _, (Choice2Of3 value) -> 
                handled := true
                (fun () -> cont value)
            | false, Choice3Of3 e1, Choice3Of3 e2 -> 
                handled := true; 
                (fun () -> 
                    econt (new AggregateException
                                ("Both clauses of a choice failed.", [| e1; e2 |])))
            | false, Choice1Of3 _, Choice3Of3 _ 
            | false, Choice3Of3 _, Choice1Of3 _ 
            | false, Choice1Of3 _, Choice1Of3 _ -> ignore )
        op() 

      // Run a workflow and write result (or exception to a ref cell
      let run resCell workflow = async {
        try
          let! res = workflow
          synchronized (fun () -> resCell := Choice2Of3 res)
        with e ->
          synchronized (fun () -> resCell := Choice3Of3 e)
        complete() }

      // Start both work items in thread pool
      Async.Start(run result1 a)
      Async.Start(run result2 b) )

【讨论】:

    【解决方案2】:

    托马斯已经回答了确切的问题。但是,您可能有兴趣知道我的 F# 的 Hopac 库直接支持 Concurrent ML -style 一等高阶选择性事件,称为 alternatives,它直接提供 choose -combinator 并提供比 Go 的 select 语句更具表现力的并发抽象机制。

    关于交错两个异步序列的更具体问题,我最近开始尝试如何使用 Hopac 完成 Rx 风格的编程。我想出的一种可能的方法是定义一种短暂的事件流。你可以在这里找到实验代码:

    如您所见,为事件流定义的操作之一是merge。您正在寻找的内容可能在语义上略有不同,但使用 Hopac 风格的替代方案(或并发 ML 风格的事件)可能很容易实现。

    【讨论】:

    • 这看起来很有趣 Vesa。我将不得不对此进行试验。
    猜你喜欢
    • 2011-12-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-04
    • 2019-09-06
    • 1970-01-01
    • 2020-10-18
    相关资源
    最近更新 更多