【问题标题】:How to await multiple tasks in Elixir?如何在 Elixir 中等待多个任务?
【发布时间】:2017-07-08 21:32:55
【问题描述】:

我想同时执行多项任务。在 Javascript 中,我会这样做:

async function cook_an_egg() {}

async function take_shower() {}

async function call_mum() {}

await Promise.all([cook_an_egg(), take_shower(), call_mum()])

如何在 Elixir 任务模块中实现Promise.all? 从documentation,看来你只能await 1 个任务;在每个 task 中定义 1 个函数;并仅将相同的功能应用于具有 async_stream 的多个项目。

【问题讨论】:

标签: erlang async-await task elixir


【解决方案1】:

对于 Elixir v1.11.0 及更高版本

Task.await_many 正是为此而设计的。它正确地处理了整个超时,并且在面对退出、超时等时应该做最不令人惊讶的事情。

tasks = [
  Task.async(fn -> cook_an_egg(:medium) end),
  Task.async(fn -> take_shower(10) end),
  Task.async(fn -> call_mum() end),
]

Task.await_many(tasks)

对于旧版本

Task.await 更防弹的解决方案是Task.yield_many。不幸的是,它有点冗长,因为它让我们自己负责处理超时和死任务。如果我们想模仿async/await的行为并在出现问题时退出,它会是这样的:

tasks = [
  Task.async(fn -> cook_an_egg(:medium) end),
  Task.async(fn -> take_shower(10) end),
  Task.async(fn -> call_mum() end),
]

Task.yield_many(tasks)
|> Enum.map(fn {task, result} ->
  case result do
    nil ->
      Task.shutdown(task, :brutal_kill)
      exit(:timeout)
    {:exit, reason} ->
      exit(reason)
    {:ok, result} ->
      result
  end
end)

为什么不使用await

使用Task.await 可以在简单的情况下工作,但如果您关心超时,您可能会遇到麻烦。跨列表的映射是按顺序进行的,这意味着每个 Task.await 在给出结果之前将阻塞到指定的超时时间,此时我们移动到列表中的下一项并阻塞 再次 for up到完全超时。

我们可以通过创建一个休眠 1-8 秒的任务列表来演示这种行为。默认超时时间为 5 秒,其中一些任务在直接使用 await 调用时会被终止,但是当我们遍历列表时,情况并非如此:

for ms <- [2_000, 4_000, 6_000] do
  Task.async(fn -> Process.sleep(ms); ms end)
end
|> Enum.map(&Task.await/1)

# Blocks for 6 seconds
# => [2000, 4000, 6000]

# Each `await` picks up after the previous one finishes with a fresh 5s timeout.
# Since each one blocks for 2s before finishing, no timeout is triggered
# but the total run time runs over.
 
# async(2s)--await(2s)-->(2s)
# async(4s)                  --await(2s)-->(4s)
# async(6s)                                    --await(2s)-->(6s)

如果我们修改它以使用Task.yield_many,我们可以获得所需的行为:

for ms <- [2_000, 4_000, 6_000] do
  Task.async(fn -> Process.sleep(ms); ms end)
end
|> Task.yield_many(5000)
|> Enum.map(fn {t, res} -> res || Task.shutdown(t, :brutal_kill) end)

# Blocks for 5 seconds
# => [{:ok, 2000}, {:ok, 4000}, nil]

【讨论】:

  • 这看起来确实更加健壮和详尽。谢谢!
  • 不错!您能否详细说明为什么在第一个示例中它等待块 10 秒而不是默认的 5 秒
  • 我更改了示例并添加了更多解释,希望能更好地解释为什么超时会以它们的方式工作。我还意识到我应该添加一个指向已添加到 Elixir 的新 await_many 函数的链接,因为我提出它正是为了帮助解决这个问题?。
【解决方案2】:

自从提出这个问题后,Elixir 的任务模块就萌生了新的力量。

Task.await_many/2Task.yield_many/2 都有,听起来像。

回答原问题中的例子:

cook_an_egg = Task.async(fn -> end)
take_shower = Task.async(fn -> end)
call_mum = Task.async(fn -> end)
Task.await_many([cook_an_egg, take_shower, call_mum])

没有与Promise.any 类似的东西,但您可以使用Task.yield_many/2 轻松编写一个

【讨论】:

    【解决方案3】:

    您可以将await 函数映射到任务引用列表。 类似的东西

    tasks = Enum.reduce(0..9, [], fn _, acc -> 
      [Task.async(&any_job/0) | acc]
    end)
    
    Enum.map(tasks, &Task.await/1)
    

    【讨论】:

    • 有趣的是,我希望 map 函数中的第一个 Task.await 会阻止进程调用下一个 await 直到它完成。 Task.await 没有按设计阻止任何“枚举”方法,我说得对吗?
    • 这有关系吗?假设第一个进程花费的时间最长 - 所以你等待它,然后当第一个等待完成时,所有其他进程都会在他们的进程完成后立即完成。
    • 好吧,当我不明白它的行为方式时,这很重要。请您看一下这段代码:elixirplayground.com?gist=4682af479dbb1a4510b7deef373ca51e。我真的不明白日志"done async 2" 是如何出现在第二个"starting new task" 之前的。
    • 您的两个函数(async_1 和 _2)都以异步方式写入 IO 设备,因此您无需等待它们即可看到该副作用,但在您需要时它可以正常工作这两个函数的结果 ([1,2])
    • 每个Task.await 都会阻塞直到完成,如果我们想强制超时,这肯定很重要。我添加了一个答案来演示这种方法的问题,并使用Task.yield_many 提供了一个更强大(虽然也更冗长)的解决方案。
    猜你喜欢
    • 1970-01-01
    • 2016-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-03-16
    • 1970-01-01
    • 2016-05-01
    相关资源
    最近更新 更多