【问题标题】:Elixir lang processing lists in parallelElixir lang 并行处理列表
【发布时间】:2015-06-26 17:59:29
【问题描述】:

我目前在 Elixir 中并行处理列表时遇到问题。并行性的原因是我将结果保存到 API,如果我一次将它们全部炸毁,它会被 DDOS 攻击并关闭。

下面的代码应该分割SQL查询的结果,并在一个单独的任务中处理每一行,当所有任务都完成后,它应该终止。

第一个任务在触发消息后会导致脚本终止。我已经看到他们将接收放在一个函数中并且该函数一遍又一遍地调用自身的答案,但我觉得必须有另一种更好的方法来处理这个问题。

results = Enum.chunk(results, 500)

# Give this process a name
Process.register(self(), :core)

# Loop over the chunks making a process for each
Enum.each results, fn(result) ->
  task = Task.async(fn -> Person.App.process(result, "Test", "1") end)
end

# And listen for messages
receive do
  {:hello, msg} -> IO.inspect msg
  {:world, _} -> "won't match"
end

【问题讨论】:

  • 这段代码不再编译

标签: elixir


【解决方案1】:

当使用Task.async时,使用Task.await获取结果最方便:

results
|> Enum.map(fn result -> Task.async(fn -> Person.App.process(result, "Test", "1") end) end)
|> Enum.map(&Task.await/1)
|> Enum.each(&IO.inspect/1)

事实上,如果你不await 得到async 的结果,它仍然会被发送到调用async 的进程并存储在它的邮箱中,可能会导致内存泄漏!如果您的意图是创建一个您不关心结果的Task,请改用Task.startTask.start_link

【讨论】:

  • 当我将 Task.async 更改为 Task.start_link 时,主进程在行完成处理之前退出。在所有任务完成之前,如何让脚本保持活动状态?
  • 我认为您应该在答案中使用该方法-继续使用Task.async,然后对所有任务使用Task.await(这就是Enum.map(&Task.await/1) 所做的)。该行的结果将是收集在列表中的所有任务的结果 - 然后您可以将它们打印出来(答案中的Enum.each(&IO.inspect/1))或用它们做其他事情。
  • 好吧,这对我有用!我不得不更改任务等待超时,因为有时任务需要 10-20 秒 Enum.map(&Task.await(&1, 20000))
  • 对,默认超时时间是5000(5秒)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-02-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多