【问题标题】:Detecting Elixir/OTP supervisor child spawn and termination检测 Elixir/OTP 主管子生成和终止
【发布时间】:2016-01-16 04:25:17
【问题描述】:

我正在 Elixir 中建立一个工作队列作为学术练习。目前,我的工作人员必须在创建队列时手动注册自己(请参阅 MyQuestion.Worker.start_link)。

我希望我的主管在创建/重新启动队列时将可用的工作人员注册到队列中,因为这似乎有助于测试工作人员并最大限度地减少耦合。

有没有办法做我在MyQuestion.Supervisor下面的代码中描述的事情?

defmodule MyQuestion.Supervisor do
  use Supervisor

  def start_link do
    supervisor = Supervisor.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    children = [
      worker(MyQuestion.JobQueue, []),
      worker(MyQuestion.Worker, [], id: :worker_0),
      worker(MyQuestion.Worker, [], id: :worker_1)]
    supervise(children, strategy: :rest_for_one)
  end

  # LOOKING FOR SOMETHING LIKE THIS
  # on worker spawn, I want to add the worker to the queue
  def child_spawned(pid, {MyQuestion.Worker, _, _}) do
    # add worker to queue
    MyQuestion.JobQueue.add_new_worker(pid)
  end

  # LOOKING FOR SOMETHING LIKE THIS
  # I want some way to do the following (imagine the callback existed)
  def child_terminated(pid, reason, state) 
    # with this information I could tell the job queue to mark 
    # the job associated with the pid as failed and to retry
    # or maybe extract the job id from the worker state, etc.
    MyQuestion.JobQueue.remove_worker(pid)
    MyQuestion.JobQueue.restart_job_for_failed_worker(pid)
  end

end

defmodule MyQuestion.JobQueue do
  def start_link do
    Agent.start_link(fn -> [] end, name: __MODULE__)
  end

  def new_worker(pid) do
    # register pid with agent state in available worker list, etc.
  end

  def add_job(job_description) do
    # find idle worker and run job
    <... snip ...>
  end

  <... snip ...>
end

defmodule MyQuestion.Worker do
  use GenServer
  def start_link do
    # start worker
    {:ok, worker} = GenServer.start_link(__MODULE__, [])

    # Now we have a worker pid, so we can register that pid with the queue
    # I wish this could be in the supervisor or else where.
    MyQuestion.JobQueue.add_new_worker(worker)

    # must return gen server's start link
    {:ok, worker}
  end

  <... snip ...>
end

【问题讨论】:

    标签: elixir erlang-otp


    【解决方案1】:

    他们的关键是调用 Process.monitor(pid) 的组合——然后你会收到对 handle_info 的调用——然后手动调用 Supervisor.start_child 给你 pid。

    我之前曾尝试使用handle_info,但始终无法调用它。 Process.monitor(pid) 必须从您希望接收通知的同一进程中调用,因此您必须从 handle_call 函数内部调用它,以将监视器与您的服务器进程相关联。可能有一个函数可以将代码作为另一个进程运行(即run_from_process(job_queue_pid, fn -&gt; Process.monitor(pid_to_monitor) end)),但我找不到任何东西。

    Attached 是一个非常幼稚的作业队列实现。我只在 Elixir 工作了一天,所以代码既杂乱又不习惯,但我附上它是因为似乎缺乏围绕该主题的示例代码。

    看看HeavyIndustry.JobQueuehandle_infocreate_new_worker。这段代码有一个明显的问题:它能够在工作人员崩溃时重新启动它们,但它无法从该代码启动下一个作业的队列(由于需要在 handle_info 中添加 GenServer.call,这使我们陷入僵局)。我认为您可以通过将启动作业的进程与跟踪作业的进程分开来解决此问题。如果您运行示例代码,您会注意到它最终会停止运行作业,即使队列中还有一个作业(:crash 作业)。

    defmodule HeavyIndustry.Supervisor do
      use Supervisor
    
      def start_link do
        Supervisor.start_link(__MODULE__, :ok)
      end
    
      def init(:ok) do
        # default to supervising nothing, we will add
        supervise([], strategy: :one_for_one)
      end
    
      def create_children(supervisor, worker_count) do
        # create the job queue. defaults to no workers
        Supervisor.start_child(supervisor, worker(HeavyIndustry.JobQueue, [[supervisor, worker_count]]))
      end
    end
    
    defmodule HeavyIndustry.JobQueue do
      use GenServer
    
      @job_queue_name __MODULE__
    
      def start_link(args, _) do
        GenServer.start_link(__MODULE__, args, name: @job_queue_name)
      end
    
      def init([supervisor, n]) do
        # set some default state
        state = %{
          supervisor: supervisor,
          max_workers: n,
          jobs: [],
          workers: %{
            idle: [],
            busy: []
          }
        }
        {:ok, state}
      end
    
      def setup() do
        # we want to be aware of worker failures. we hook into this by calling
        # Process.monitor(pid), but this links the calling process with the monitored
        # process. To make sure the calls come to US and not the process that called
        # setup, we create the workers by passing a message to our server process
        state = GenServer.call(@job_queue_name, :setup)
    
        # gross passing the whole state back here to monitor but the monitoring must
        # be started from the server process and we can't call GenServer.call from
        # inside the :setup call else we deadlock.
        workers = state.workers.idle
        GenServer.call(@job_queue_name, {:monitor_pids, workers})
      end
    
      def add_job(from, job) do
        # add job to queue
        {:ok, our_job_id} = GenServer.call(@job_queue_name, {:create_job, %{job: job, reply_to: from}})
    
        # try to run the next job
        case GenServer.call(@job_queue_name, :start_next_job) do
          # started our job
          {:ok, started_job_id = ^our_job_id} -> {:ok, :started}
          # started *a* job
          {:ok, _} -> {:ok, :pending}
          # couldnt start any job but its ok...
          {:error, :no_idle_workers} -> {:ok, :pending}
          # something fell over...
          {:error, e} -> {:error, e}
          # yeah I know this is bad.
          _ -> {:ok}
        end
      end
    
      def start_next_job do
        GenServer.call(@job_queue_name, :start_next_job)
      end
    
      ##
      # Internal API
      ##
    
      def handle_call(:setup, _, state) do
        workers = Enum.map(0..(state.max_workers-1), fn (n) ->
          {:ok, pid} = start_new_worker(state.supervisor)
          pid
        end)
        state = %{state | workers: %{state.workers | idle: workers}}
        {:reply, state, state}
      end
    
      defp start_new_worker(supervisor) do
        spec = Supervisor.Spec.worker(HeavyIndustry.Worker, [], id: :"Worker.#{:os.system_time}", restart: :temporary)
        # start worker
        Supervisor.start_child(supervisor, spec)
      end
    
      def handle_call({:monitor_pids, list}, _, state) do
        Enum.each(list, &Process.monitor(&1))
        {:reply, :ok, state}
      end
    
      def handle_call({:create_job, job}, from, state) do
        job = %{
          job: job.job,
          reply_to: job.reply_to,
          id: :os.system_time, # id for task
          status: :pending, # start pending, go active, then remove
          pid: nil
        }
        # add new job to jobs list
        state = %{state | jobs: state.jobs ++ [job]}
        {:reply, {:ok, job.id}, state}
      end
    
      def handle_call(:start_next_job, _, state) do
        IO.puts "==> Start Next Job"
        IO.inspect state
        IO.puts "=================="
    
        reply = case {find_idle_worker(state.workers), find_next_job(state.jobs)} do
          {{:error, :no_idle_workers}, _} ->
            # no workers for job, doesnt matter if we have a job
            {:error, :no_idle_workers}
    
          {_, nil} ->
            # no job, doesnt matter if we have a worker
            {:error, :no_more_jobs}
    
          {{:ok, worker}, job} ->
            # have worker, have job, do work
    
            # update state to set job active and worker busy
            jobs = state.jobs -- [job]
            job = %{job | status: :active, pid: worker}
            jobs = jobs ++ [job]
    
            idle = state.workers.idle -- [worker]
            busy = state.workers.busy ++ [worker]
    
            state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}
    
            {:ok, task_id} = Task.start(fn ->
              result = GenServer.call(worker, job.job)
    
              remove_job(job)
              free_worker(worker)
    
              send job.reply_to, %{answer: result, job: job.job}
    
              start_next_job
            end)
            {:ok, job.id}
        end
    
        {:reply, reply, state}
      end
    
      defp find_idle_worker(workers) do
        case workers do
          %{idle: [], busy: _} -> {:error, :no_idle_workers}
          %{idle: [worker | idle], busy: busy} -> {:ok, worker}
        end
      end
    
      defp find_next_job(jobs) do
        jobs |> Enum.find(&(&1.status == :pending))
      end
    
      defp free_worker(worker) do
        GenServer.call(@job_queue_name, {:free_worker, worker})
      end
      defp remove_job(job) do
        GenServer.call(@job_queue_name, {:remove_job, job})
      end
    
      def handle_call({:free_worker, worker}, from, state) do
        idle = state.workers.idle ++ [worker]
        busy = state.workers.busy -- [worker]
        {:reply, :ok, %{state | workers: %{idle: idle, busy: busy}}}
      end
    
      def handle_call({:remove_job, job}, from, state) do
        jobs = state.jobs -- [job]
        {:reply, :ok, %{state | jobs: jobs}}
      end
    
      def handle_info(msg = {reason, ref, :process, pid, _reason}, state) do
        IO.puts "Worker collapsed: #{reason} #{inspect pid}, clear and restart job"
    
        # find job for collapsed worker
        # set job to pending again
        job = Enum.find(state.jobs, &(&1.pid == pid))
        fixed_job = %{job | status: :pending, pid: nil}
        jobs = (state.jobs -- [job]) ++ [fixed_job]
    
        # remote worker from lists
        idle = state.workers.idle -- [pid]
        busy = state.workers.busy -- [pid]
    
        # start new worker
        {:ok, pid} = start_new_worker(state.supervisor)
    
        # add worker from lists
        idle = state.workers.idle ++ [pid]
    
        # cant call GenServer.call from here to monitor pid,
        # so duplicate the code a bit...
        Process.monitor(pid)
    
        # update state
        state = %{state | jobs: jobs, workers: %{idle: idle, busy: busy}}
    
        {:noreply, state}
      end
    end
    
    defmodule HeavyIndustry.Worker do
      use GenServer
    
      def start_link do
        GenServer.start_link(__MODULE__, :ok)
      end
    
      def init(:ok) do
        # workers have no persistent state
        IO.puts "==> Worker up! #{inspect self}"
        {:ok, nil}
      end
    
      def handle_call({:sum, list}, from, _) do
        sum = Enum.reduce(list, fn (n, acc) -> acc + n end)
        {:reply, sum, nil}
      end
    
      def handle_call({:fib, n}, from, _) do
        sum = fib_calc(n)
        {:reply, sum, nil}
      end
    
      def handle_call({:stop}, from, state) do
        {:stop, "my-stop-reason", "my-stop-reply", state}
      end
    
      def handle_call({:crash}, from, _) do
        {:reply, "this will crash" ++ 1234, nil}
      end
    
      def handle_call({:timeout}, from, _) do
        :timer.sleep 10000
        {:reply, "this will timeout", nil}
      end
    
      # Slow fib
      defp fib_calc(0), do: 0
      defp fib_calc(1), do: 1
      defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
    
    end
    
    defmodule Looper do
      def start do
        {:ok, pid} = HeavyIndustry.Supervisor.start_link
        {:ok, job_queue} = HeavyIndustry.Supervisor.create_children(pid, 2)
        HeavyIndustry.JobQueue.setup()
        add_jobs
        loop
      end
    
      def add_jobs do
        jobs = [
          {:sum, [100, 200, 300]},
          {:crash},
          {:fib, 35},
          {:fib, 35},
          {:sum, [88, 88, 99]},
          {:fib, 35},
          {:fib, 35},
          {:fib, 35},
          {:sum, 0..100},
          # {:stop}, # stop not really a failure
    
          {:sum, [88, 88, 99]},
          # {:timeout},
          {:sum, [-1]}
        ]
        Enum.each(jobs, fn (job) ->
          IO.puts "~~~~> Add job: #{inspect job}"
          case HeavyIndustry.JobQueue.add_job(self, job) do
            {:ok, :started} -> IO.puts "~~~~> Started job immediately"
            {:ok, :pending} -> IO.puts "~~~~> Job in queue"
            val -> IO.puts "~~~~> ... val: #{inspect val}"
          end
        end)
      end
    
      def loop do
        receive do
          value ->
            IO.puts "~~~~> Received: #{inspect value}"
            loop
        end
      end
    end
    
    Looper.start
    

    【讨论】:

      猜你喜欢
      • 2011-09-04
      • 2017-09-19
      • 2014-12-09
      • 1970-01-01
      • 2016-01-23
      • 2012-11-16
      • 2023-03-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多