【问题标题】:Accumulating message outputs in a specified order按指定顺序累积消息输出
【发布时间】:2016-03-26 01:08:31
【问题描述】:

我想按指定的顺序累积不同进程发送的消息的输出。

例如,我有一个 pid 列表 [pid0, pid1]。如果我先得到pid0,那很好。如果我先得到pid1,那么在我得到pid0 之前什么都不会发生。

我知道这可以通过在收到所有消息后使用地图或关键字查找和排序来解决。但是,这可以通过模式匹配来实现吗?

例如,写这样的东西来保证它只是尝试接收消息input_pid(列表中的第一条消息):

defp loop([input_pid | remaining_input_pids], acc) do
  receive do
    {:forward, ^input_pid, output} -> loop(remaining_input_pids, [output | acc])
  end
end

【问题讨论】:

    标签: elixir


    【解决方案1】:

    让我们先看一个不起作用的例子,然后找到解决它的方法:

    caller = self
    
    pids = for x <- 1..100 do
      spawn fn ->
        :random.seed(:erlang.monotonic_time)
        :timer.sleep(:random.uniform(1000))
        send(caller, x)
      end
    end
    
    for _ <- pids do
      receive do
        x -> IO.inspect(x)
      end
    end
    

    这会产生 100 个进程,其中每个进程会随机休眠一段时间,然后将 x 发送回调用方。之后,按照发回的顺序接收结果,这是随机的。

    如果我们想按照进程产生的顺序获得结果,我们需要给接收进程一个提示。正如您正确观察到的,我们可以使用生成的进程'pid 来做到这一点。我们在调用者中获取pid作为来自spawn的返回值,但我们也可以通过在进程内部调用self来获取衍生进程的pid,然后将其发送回调用者:

    caller = self
    
    pids = for x <- 1..100 do
      spawn fn ->
        :random.seed(:erlang.monotonic_time)
        :timer.sleep(:random.uniform(1000))
        send(caller, {self, x})
      end
    end
    
    for pid <- pids do
      receive do
        {^pid, x} -> IO.inspect(x)
      end
    end
    

    请注意,这可能会淹没调用者的进程收件箱。在最坏的情况下,最先产生的进程可能最后收到,然后所有其他消息将一直存在直到结束。

    现在,您是否需要更进一步取决于流程的数量,您应该针对您的具体情况对结果进行基准测试。大量进程的问题在于邮箱是如何处理的:每次调用receive,它都会循环遍历所有个消息,直到找到一个匹配。这意味着在最坏的情况下,您将迭代邮箱 n + (n-1) + (n-2) + ... + 1 = n*(n+1)/2 次(二次时间复杂度 O(n²)),这可能会成为瓶颈。

    在这种情况下,更好的选择可能是立即接收消息,将它们存储在地图中,然后以正确的顺序读出它们。这样做的缺点是您总是必须等到收到所有消息。使用第一种方法,一旦按顺序的下一个消息到达,消息就会被处理。这是一个如何做到这一点的例子。在这种情况下,我使用了 100,000 个进程,而其他方法已经变得非常慢:

    caller = self
    
    pids = for x <- 1..100_000 do
      spawn fn ->
        :random.seed(:erlang.monotonic_time)
        :timer.sleep(:random.uniform(1000))
        send(caller, {self, x})
      end
    end
    
    results = Enum.reduce pids, %{}, fn(_, results) ->
      receive do
        {pid, x} -> Map.put(results, pid, x)
      end
    end
    
    for pid <- pids do
      IO.inspect(results[pid])
    end
    

    【讨论】:

    • 感谢您的深入回答。我选择了一个使用直接模式匹配的不同版本。但是因为它太有趣了,所以我会把你的答案标记为正确的。
    【解决方案2】:

    @patrick-oscity 给出了一个非常深入的答案,我非常喜欢它。但是,我已经使用了一个版本,它可以通过对 pin 运算符使用模式匹配来做到这一点。

    对于上下文,我正在编写一个累加器,当所有输入都已收到时,它会将其所有输入的结果转发给执行器。我还希望作为列表的累积输出遵守最初发送其输出的进程的预定义顺序。

    完成此操作后,累加器再次开始等待消息。

    累加器如下所示:

    def start_link(actuator, actuator_pid) do
      Task.start_link(fn -> loop(actuator, actuator_pid, actuator.inputs, []) end)
    end
    
    defp loop(actuator, actuator_pid, [], acc) do
      result = acc |> Enum.reverse
      send actuator_pid, {:forward, self(), result}
      loop(actuator, actuator_pid, actuator.inputs, [])
    end
    
    defp loop(actuator, actuator_pid, [input | remaining_inputs], acc) do
      receive do
        {:forward, ^input, output} ->
          loop(actuator, actuator_pid, remaining_inputs, [output | acc])
      end
    end
    

    为了证明它符合我的要求,对此的测试如下:

    test "When I recieve the outputs from all my inputs, then I return the accumulated result in the order of my actuator's inputs" do
    
    {_, pid0} = Task.start_link(fn -> test_loop end)
    {_, pid1} = Task.start_link(fn -> test_loop end)
    {_, pid2} = Task.start_link(fn -> test_loop end)
    
    actuator = %Cerebrum.Actuator{
      inputs: [pid0, pid1, pid2]
    }
    
    actuator_pid = self()
    {_, pid} = start_link(actuator, actuator_pid)
    
    send pid, {:forward, pid0, 0}
    refute_receive {:forward, pid, [0]}
    
    send pid, {:forward, pid1, 1}
    refute_receive {:forward, pid, [0, 1]}
    
    send pid, {:forward, pid2, 2}
    assert_receive {:forward, pid, [0, 1, 2]}
    
    send pid, {:forward, pid0, 0}
    send pid, {:forward, pid1, 1}
    send pid, {:forward, pid2, 2}
    assert_receive {:forward, pid, [0, 1, 2]}
    
    send pid, {:forward, pid0, 0}
    send pid, {:forward, pid2, 2}
    send pid, {:forward, pid1, 1}
    assert_receive {:forward, pid, [0, 1, 2]}
    
    send pid, {:forward, pid1, 1}
    send pid, {:forward, pid0, 0}
    send pid, {:forward, pid2, 2}
    assert_receive {:forward, pid, [0, 1, 2]}
    
    send pid, {:forward, pid1, 1}
    send pid, {:forward, pid2, 2}
    send pid, {:forward, pid0, 0}
    assert_receive {:forward, pid, [0, 1, 2]}
    
    send pid, {:forward, pid2, 2}
    send pid, {:forward, pid1, 1}
    send pid, {:forward, pid0, 0}
    assert_receive {:forward, pid, [0, 1, 2]}
    
    send pid, {:forward, pid2, 2}
    send pid, {:forward, pid1, 1}
    send pid, {:forward, pid0, 0}
    assert_receive {:forward, pid, [0, 1, 2]}
    
    end
    
    defp test_loop do
      test_loop
    end
    

    【讨论】:

    • 我认为这基本上归结为我的第一种方法,从我的角度来看似乎很好!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-08-11
    • 1970-01-01
    • 2019-10-02
    • 1970-01-01
    • 1970-01-01
    • 2019-03-02
    • 2022-06-15
    相关资源
    最近更新 更多