【问题标题】:How do I make a GenServer that processes messages at a specific rate? (every n seconds)如何制作以特定速率处理消息的 GenServer? (每 n 秒)
【发布时间】:2017-01-30 05:37:21
【问题描述】:

我的一个服务与速率受限的外部 API 通信,因此我想确保每 10 秒发送的调用不超过 1 个。

我的天真的方法是拥有一个长时间运行的 API 服务,并在每次调用后将其超时:

def handle_cast({:call_api, data}, state) do
  send_to_external_api(data)
  :timer.sleep(10000)
  {:noreply, state}
end

我不确定是否有合适的方法来做到这一点。

【问题讨论】:

  • 您还可以利用Process.send_after。看看这个答案:stackoverflow.com/questions/32085258/…
  • @edmz 嗨,这并不是一个真正的周期性任务,因为它是每 10 秒执行一次的任务。而是一个可以被其他进程调用的进程,但它必须最多以 10 秒的间隔执行。 -- 目的仅仅是将 API 调用限制在阈值以下。
  • 因为这是handle_cast,这种方法对我来说看起来不错。如果您想发送“每 10 秒最多 1 个请求”而不是“请求之间的 10 秒间隔”,您可能需要从 10000 中减去 send_to_external_api(data) 花费的时间
  • 您是想在这 10 秒内丢弃其他请求,还是要将它们排队,以便在 10 秒后发送?

标签: elixir gen-server


【解决方案1】:

编辑:原始解决方案在 10 秒的时间间隔内丢弃了消息,正如 burmajam 建议的那样。修改提供了更合适的解决方案。


编辑

由于 GenServer 的 handle_* 函数实际上并不从队列中接收消息,而只是对其进行处理,因此我们不能利用模式匹配来选择性地每隔 10 秒从进程队列中接收消息。

因此,由于我们按照到达的顺序接收消息,因此我们需要一个内部队列作为 GenServer 状态的一部分。

defmodule RateLimited do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, %{queue: []})
  end

  def init(state) do
    allow_work()
    {:ok, state}
  end

  def handle_cast({:call_api, data}, %{"queue" => queue} = state) do
    {:noreply, %{state | queue: queue ++ [data]}}
  end

  def handle_info(:work, %{"queue" => [data | queue]} = state) do
      send_to_external_api(data)
    allow_work()

    {:noreply, %{state | queue: queue}}
  end

  defp allow_work() do
    Process.send_after(self(), :work, 10000) # after 10s
  end

  defp send_to_external_api (data) do end
end

所以我们只是将消息从进程队列移动到状态队列,当我们向自己发出 10 秒过去的信号时,我们会处理头部。

但最终,我们实际上获得了与让进程休眠 10 秒相同的结果。您的解决方案似乎更简单,并且达到了相同的结果。


解决方案基于How to run some code every few hours in Phoenix framework?

首先,让您的 GenServer 在其状态中存储一个标志(工作 = true/false)。

然后让 GenServer 在可以工作时使用Process.send_after 向自己发出信号。您在handle_info 中收到信号,您将work 状态标志设置为true

现在注意handle_cast 函数中状态的模式匹配:它只会在work 状态标志等于true 时接收消息。否则,消息将被放入队列等待。

在您将消息发送到外部服务后,您再次运行Process.send_after 以安排下一个信号并返回将work 标志设置为false 的状态以防止下一个消息被立即拾取。

defmodule RateLimited do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, %{work: false})
  end

  def init(state) do
    allow_work()
    {:ok, state}
  end

  def handle_cast({:call_api, data}, %{"work" => true} = state) do
    send_to_external_api(data)
    allow_work()
    {:noreply, %{state | work = false}}
  end

  def handle_info(:work, state) do
    {:noreply, %{state | work = true}}
  end

  defp allow_work() do
    Process.send_after(self(), :work, 10000) # after 10s
  end
end

【讨论】:

  • 如果他想在这 10 秒内放弃请求,这是一个很好的解决方案。但是,当工作为假时,您需要 handle_cast :)
  • 先生,您是正确的 :) 我已经编辑了我的答案以包含更合适的解决方案。
  • 将此标记为正确,因为我的解决方案虽然更简单,但存在调用者必须等到超时结束才能接收 {:noreply} 元组的问题,这对于长时间超时是有问题的,因为它块。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-11-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-02-09
  • 2020-02-03
相关资源
最近更新 更多