编辑:原始解决方案在 10 秒的时间间隔内丢弃了消息,正如 burmajam 建议的那样。修改提供了更合适的解决方案。
编辑
由于 GenServer 的 handle_* 函数实际上并不从队列中接收消息,而只是对其进行处理,因此我们不能利用模式匹配来选择性地每隔 10 秒从进程队列中接收消息。
因此,由于我们按照到达的顺序接收消息,因此我们需要一个内部队列作为 GenServer 状态的一部分。
defmodule RateLimited do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{queue: []})
end
def init(state) do
allow_work()
{:ok, state}
end
def handle_cast({:call_api, data}, %{"queue" => queue} = state) do
{:noreply, %{state | queue: queue ++ [data]}}
end
def handle_info(:work, %{"queue" => [data | queue]} = state) do
send_to_external_api(data)
allow_work()
{:noreply, %{state | queue: queue}}
end
defp allow_work() do
Process.send_after(self(), :work, 10000) # after 10s
end
defp send_to_external_api (data) do end
end
所以我们只是将消息从进程队列移动到状态队列,当我们向自己发出 10 秒过去的信号时,我们会处理头部。
但最终,我们实际上获得了与让进程休眠 10 秒相同的结果。您的解决方案似乎更简单,并且达到了相同的结果。
解决方案基于How to run some code every few hours in Phoenix framework?
首先,让您的 GenServer 在其状态中存储一个标志(工作 = true/false)。
然后让 GenServer 在可以工作时使用Process.send_after 向自己发出信号。您在handle_info 中收到信号,您将work 状态标志设置为true。
现在注意handle_cast 函数中状态的模式匹配:它只会在work 状态标志等于true 时接收消息。否则,消息将被放入队列等待。
在您将消息发送到外部服务后,您再次运行Process.send_after 以安排下一个信号并返回将work 标志设置为false 的状态以防止下一个消息被立即拾取。
defmodule RateLimited do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{work: false})
end
def init(state) do
allow_work()
{:ok, state}
end
def handle_cast({:call_api, data}, %{"work" => true} = state) do
send_to_external_api(data)
allow_work()
{:noreply, %{state | work = false}}
end
def handle_info(:work, state) do
{:noreply, %{state | work = true}}
end
defp allow_work() do
Process.send_after(self(), :work, 10000) # after 10s
end
end