【问题标题】:Is there a way to make an asynchronous call to a gen_server in Erlang?有没有办法在 Erlang 中对 gen_server 进行异步调用?
【发布时间】:2015-04-18 15:34:30
【问题描述】:

例如假设我有一个实现 gen_server 行为的模块,它有

handle_call({foo, Foo}, _From, State) ->
  {reply, result(Foo), State}
;

我可以通过从其他进程执行 gen_server:call(Server, {foo, Foo}) 来访问此处理程序(我猜如果 gen_server 尝试 gen_server:call 本身,它会死锁)。但是 gen_server:call 响应(或超时)阻塞。如果我不想阻止响应怎么办?

虚构的用例:假设我有 5 个这样的 gen_server,并且来自其中任何 2 个的响应对我来说就足够了。我想做的是这样的:

OnResponse -> fun(Response) ->
  % blah
end,
lists:foreach(
  fun(S) ->
    gen_server:async_call(S, {foo, Foo}, OnResponse)
  end, 
  Servers),
Result = wait_for_two_responses(Timeout),
lol_i_dunno()

我知道 gen_server 有强制转换,但是强制转换无法提供任何响应,所以我认为这不是我想要的。此外,调用者是否希望同步处理响应(使用 gen_server:call)或异步(似乎不存在?)似乎不应该是 gen_server 的关注点。

此外,允许服务器提供异步响应,方法是让 handle_call 返回 no_reply,然后调用 gen_server:reply。那么为什么不在另一边也支持异步处理响应呢?还是确实存在,只是我没找到??

【问题讨论】:

  • 这不是handle_cast 的用途吗?
  • 哦,我看到你在问题中提到了 cast,但那是异步方式。如果其他进程需要响应,则服务器需要发送另一条异步消息。
  • 如果你打算通过 handle_cast 异步发送响应,那么整个 gen_server 似乎是没用的,因为你最终需要在 cast 中发送 sender。 handle_call 不应该关心其他进程是否阻塞响应。我的意思是,我可以启动一个新进程将同步调用变为异步调用,但这似乎很浪费。
  • 我认为您不需要一个新进程来使其异步。看起来您已经在传递一个回调函数。我没有足够的 erlang 经验来提供更多帮助,但我知道 handle_call 用于同步方法,handle_cast 用于异步。将消息发送回发件人是进行异步的完美方式。不要与之抗争。

标签: erlang erlang-otp


【解决方案1】:

gen_server 在客户端没有异步调用的概念。如何始终如一地实施并非易事,因为gen_server:call 是监视服务器进程、发送请求消息并等待应答或监视关闭或超时的组合。如果你做了你提到的事情,你将需要以某种方式处理来自服务器的 DOWN 消息......所以假设async_call 应该返回一些用于 yeld 的密钥,并且还有一个内部监视器参考,用于处理来自其他进程的 DONW 消息...并且不想将其与 yeld 错误混为一谈。

不是很好但可能的替代方法是使用rpc:async_call(gen_server, call, [....])

但是这种方法在调用过程中有一个限制,它将是一个短暂的 rex 子进程,因此如果您的 gen 服务器以某种方式使用调用者 pid 而不是发送它,那么回复逻辑将会被破坏。

【讨论】:

    【解决方案2】:

    gen_server:call 基本上是一个序列

    send a message to the server (with identifier)
    wait for the response of that particular message
    

    封装在一个函数中。

    对于您的示例,您可以分两步分解行为:将gen_server:cast(Server,{Message,UniqueID,self()} 与所有服务器一起使用的循环,然后是等待至少2 个{UniqueID,Answer} 形式的答案的接收循环。但是您必须注意在某个时间点清空您的邮箱。更好的解决方案应该是将此委托给一个单独的进程,该进程在收到所需数量的答案时会简单地死亡:

    [edit] 在代码中进行一些更正,现在它应该可以工作了 :o)

    get_n_answers(Msg,ServerList,N) when N =< length(ServerList) ->
        spawn(?MODULE,get_n_answers,[Msg,ServerList,N,[],self()]).
    
    get_n_answers(_Msg,[],0,Rep,Pid) -> 
        Pid ! {Pid,Rep};
    get_n_answers(_Msg,[],N,Rep,Pid) -> 
        NewRep = receive
            Answ -> [Answ|Rep]
        end,
        get_n_answers(_Msg,[],N-1,NewRep,Pid);
    get_n_answers(Msg,[H|T],N,Rep,Pid) -> 
        %gen_server:cast(H,{Msg,Pid}),
        H ! {Msg,self()},
        get_n_answers(Msg,T,N,Rep,Pid).
    

    你可以像这样使用它:

    ID = get_n_answers(Msg,ServerList,2),
    % insert some code here
    Answer = receive
        {ID,A} -> A % tagged with ID to do not catch another message in the mailbox
    end
    

    【讨论】:

      【解决方案3】:

      gen_sever:call 到进程本身肯定会阻塞直到超时。要理解原因,应该知道gen_server 框架实际上将您的特定代码组合到一个模块中,而gen_server:call 将被“翻译”为“pid !Msg”形式。

      那么想象一下这段代码是如何生效的,进程实际上一直在循环中保持接收消息,而当控制流运行到一个处理函数时,接收进程会暂时中断,所以如果你调用gen_server:call来进程本身,因为它是一个同步函数,所以它等待响应,但是直到处理函数返回之前它永远不会进来,以便进程可以继续接收消息,因此代码处于死锁状态。

      【讨论】:

        【解决方案4】:

        您可以通过在单独的进程中发送每个调用并等待尽可能多的响应来轻松实现这一点(本质上这就是异步的意义,不是吗?:-)

        看看基于async_call from rpc library in OTPthis simple implementation of parallel call

        这就是它在简单英语中的工作方式。

        • 您需要进行 5 次调用,以便(在父进程中)生成 5 个 Erlang 子进程。
        • 每个进程向父进程返回一个包含其 PID 和调用结果的元组。
        • 只有在所需的调用完成后才能构造元组并返回。
        • 在父进程中,您在接收循环中循环响应。
        • 您可以等待所有响应,也可以等待开始的 5 个响应中的 2 个或 3 个。

        父进程(产生工作进程)最终将收到所有响应(我的意思是那些你想忽略的)。如果您不希望消息队列无限增长,则需要一种丢弃它们的方法。有两种选择:

        1. 父进程本身可以是一个临时进程,仅为调用生成其他 5 个子进程而创建。一旦收集到所需数量的响应,它就可以将响应发送回调用者并终止。发送到死亡进程的消息将被丢弃。

        2. 父进程在收到所需数量的响应后可以继续接收消息,然后将其丢弃。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2010-11-13
          • 1970-01-01
          • 1970-01-01
          • 2019-06-30
          • 1970-01-01
          • 1970-01-01
          • 2014-08-18
          • 2016-03-23
          相关资源
          最近更新 更多