【问题标题】:How to handle timeouts in poolboy?如何处理 poolboy 中的超时?
【发布时间】:2016-12-04 11:53:55
【问题描述】:

我有一个耗时的迁移问题,我希望并行运行(它可以并行运行)。实际上,迁移就是获取数据库中的所有记录,并对每条记录执行耗时和耗费资源的操作。

有时个人记录迁移会挂起,所以我给了 10 分钟的时间来完成。如果迁移未完成,我希望它正常关闭,没有任何异常(见下文)

我还使用poolboy erlang 包来并行化实现,因为迁移不仅消耗时间,还消耗资源。问题是当超时发生并且代码将要中断时,我不知道如何处理错误。我的监督树是:

defmodule MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2 do
  use Ecto.Migration

  alias MyReelty.Repo
  alias MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.Migrator

  # parallel nature of migration force us to disable transaction
  @disable_ddl_transaction true

  @migrator_waiting_time 10 * 60 * 1000 # timeout
  @poolboy_waiting_time @migrator_waiting_time + 10 * 1000 # give a time for graceful shutdown

  @pool_name :migrator
  @pool_size 3
  @pool_config [
    { :name, { :local, @pool_name }},
    { :worker_module, Migrator },
    { :size, @pool_size },
    { :max_overflow, 0 },
    { :strategy, :fifo }
  ]

  def up do
    children = [
      :poolboy.child_spec(@pool_name, @pool_config)
    ]
    opts = [strategy: :one_for_one, name: MyReelty.Supervisor]
    Supervisor.start_link(children, opts)

    rows = Review |> Repo.all

    IO.puts "Total amount of reviews is: #{length(rows)}"

    parallel_migrations(rows)
  end

  def parallel_migrations(rows) do
    Enum.map(rows, fn(row) ->
      pooled_migration(@pool_name, row)
    end)
  end

  def pooled_migration(pool, x) do
    :poolboy.transaction(
      pool,
      (fn(pid) -> Migrator.move(pid, { x, @migrator_waiting_time }) end),
      @poolboy_waiting_time
    )
  end

  defmodule Migrator do
    alias MyReelty.Repo
    alias MyReelty.Review

    use GenServer

    def start_link(_) do
      GenServer.start_link(__MODULE__, nil, [])
    end

    def move(server, { params, waiting_time }) do
      GenServer.call(server, { :move, params }, waiting_time)
    end

    def handle_call({ :move, result }, _from, state) do
      big_time_and_resource_consuming_task_here    
      {:reply, %{}, state}
    end
  end
end

如果迁移数据库中的某些记录需要超过 10 分钟的问题,我有这种异常:

20:18:16.917 [error] Task #PID<0.282.0> started from #PID<0.70.0> terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
    ** (EXIT) time out
    (elixir) lib/gen_server.ex:604: GenServer.call/3
    (poolboy) src/poolboy.erl:76: :poolboy.transaction/3
    (elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
    (elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<5.53617785/0 in MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.parallel_migrations/1>
    Args: []

20:18:16.918 [error] GenServer MyReelty.Repo terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
    ** (EXIT) time out
Last message: {:EXIT, #PID<0.70.0>, {:timeout, {GenServer, :call, [#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000]}}}
State: {:state, {:local, MyReelty.Repo}, :one_for_one, [{:child, #PID<0.231.0>, DBConnection.Poolboy, {:poolboy, :start_link, [[name: {:local, MyReelty.Repo.Pool}, strategy: :fifo, size: 1, max_overflow: 0, worker_module: DBConnection.Poolboy.Worker], {Postgrex.Protocol, [types: true, username: "adik", types: true, name: MyReelty.Repo.Pool, otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}, {Ecto.Adapters.Postgres.DateTime, []}, {Postgrex.Extensions.JSON, [library: Poison]}], pool_size: 1, pool_timeout: 5000, timeout: 15000, adapter: Ecto.Adapters.Postgres, database: "my_dev", hostname: "localhost", pool_size: 10, pool: DBConnection.Poolboy, port: 5432]}]}, :permanent, 5000, :worker, [:poolboy]}], :undefined, 3, 5, [], 0, Ecto.Repo.Supervisor, {MyReelty.Repo, :my_reelty, Ecto.Adapters.Postgres, [otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}], pool_size: 1]}}

我尝试将terminate/2handle_info/2 插入Migrator 并使用它,但我什至还没有达到要调用的函数。如何处理超时并防止它们中断我的迁移?

更新

我使用了@johlo 的提示,但我仍然有时间。我的功能是:

def init(_) do
 Process.flag(:trap_exit, true)
 {:ok, %{}}
end

【问题讨论】:

    标签: erlang elixir actor gen-server


    【解决方案1】:

    Migrator.move/2(即GenServer.call)函数超时时,它将使整个MoveVideosFromVimeoToB2进程崩溃,因为这是进行GenServer调用的实际进程。

    这里的解决方案是在pooled_migration 的匿名函数中捕获超时,类似于(我对 Elixir 语法不是很熟悉,所以它可能无法编译,但你应该明白):

    def pooled_migration(pool, x) do
    :poolboy.transaction(
      pool,
      (fn(pid) ->
          try do 
              Migrator.move(pid, { x, @migrator_waiting_time })
          catch
              :exit, reason ->
                 # Ignore error, log it or something else
                 :ok
          end
       end),
      @poolboy_waiting_time
    )
    end
    

    不是Migrator 进程超时,而是GenServer 调用Migrator 超时,我们需要try-catch

    还要注意Migrator 进程并未被杀死,它仍在运行,请参阅GenServer call documentation 中的timeouts 部分。

    更新: 正如@asiniy 在 cmets 中提到的那样,@poolboy_waiting_time 应设置为 :infinity,因此在等待空闲的 Migrator 工作进程时,poolboy.transaction 函数不会引发超时错误。因为Migrator 最终会退出,所以这是安全的。

    【讨论】:

    • 它往往是有效的!对了,我怎么能在这里杀死Migrator进程?
    • 捕获:exit 时,您可以使用Process.exit/2:kill 发送:kill 信号。或者你可以给它发送一条普通的GenServer 消息告诉它停止,但只有在之前的move 任务完成后才会处理。
    • 给我几天时间来测试它并分配赏金,好吗?
    • 请添加“从 poolboy 中删除超时,因为超时是在 Migrator 类中处理的”,我会接受你的回答 ;)
    • @asiniy 如果你删除它,poolboy 只会等待 5 秒等待空闲的工作进程,然后退出。在这种情况下,您宁愿将超时设置为 :infinity,因为您知道 Migrator 进程最终会完成?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-02-22
    • 1970-01-01
    • 2019-12-07
    • 1970-01-01
    • 2010-10-06
    • 2019-06-28
    • 1970-01-01
    相关资源
    最近更新 更多