【问题标题】:Using a response from an actor in current actor process在当前参与者进程中使用参与者的响应
【发布时间】:2021-10-24 06:08:03
【问题描述】:

我对如何以非阻塞方式解决这种情况感到困惑。

考虑两个演员 Actor1Actor2

Actor1

Map<Int, Int> foo() {
     List<String> finalList = foo_2();
     Map<Int, Int> finalMap = // do stuff with finalList to get Map<Int, Int>;

     return finalMap;
}

List<String> foo_2() {
    
     CompletableFuture<List<String>> Querylist = ask(Actor2)
     Querylist.get();
     
     return QueryList;
}

当前在 foo_2 中,Querylist.get() 是一个阻塞调用。我想以某种非阻塞方式解决这个问题。我在Actor1 内部为Actor2 创建了一个消息适配器,因此Actor2 发送的任何消息都将由Actor1 处理。

我使用下面的方法来修改阻塞调用

Map<Int, Int> foo() {
     CompletionStage<List<String>> finalList = foo_2();
     finalList.whenComplete(
        // what to do here? 
     )
     // Map<Int, Int> finalMap = // do stuff with finalList to get Map<Int, Int>;

     return finalMap;
}

CompletionStage<List<String>> foo_2() {
    
     CompletionStage<List<String>> Querylist = ask(Actor2)
     
     
     return QueryList;
}

我不确定如何正确使用 CompletionStage 构造来获得与阻塞 futures.get() 调用相同的结果。

【问题讨论】:

    标签: java akka completable-future akka-cluster akka-typed


    【解决方案1】:

    如果您使用的是 Akka Typed(从标签中暗示),则根本不需要 future 或消息适配器。只需使用ActorContext.ask

    请参阅documentation for request-response with ask between two actors

    一般来说,您将摆脱foofoo_2 方法,将您设置的消息适配器移动到ActorContext.ask 调用中,并替换您之前调用foo 的实例致电ActorContext.ask。如果您的参与者对导致请求的消息的回复取决于对请求的响应,那么一个好的做法是将所需的状态部分嵌入到适配器生成的消息中。

    【讨论】:

    • 感谢您的回答,这就是我最终要做的。你知道做 .tell() 和使用 ActorContext.ask() 有什么区别吗?您在 ActorContext.ask() 中明确传递“处理程序”的区别是什么?
    • ask 最终为您完成了管理请求-响应交互的所有工作:它建立在 tell 之上(毕竟 Akka 中的所有内容或多或少都是)。从广义上讲,ask 会生成一个参与者,并注入对该参与者的引用作为要发送的消息中的回复地址。当那个actor接收到消息时,它执行适配器并将调整后的响应转发给发送请求的actor;该actor还会在超时到期后为自己安排一条消息,并将其转换为超时消息。
    • 因为它会生成一个actor来接收回复,所以它的效率比注册一个消息适配器、执行一个通知和为自己安排一个超时消息的效率要低一些。但是,您可以在运行中使用不同的适配器进行任意多个请求:使用tell + adapter,您可能会达到每种类型一个适配器的限制(为同一类型注册不同的适配器可能是不可预测的,并且继承也可能对这种方法造成严重破坏)。 tell + 适配器方法确实具有允许多个回复的优点。
    • 这清楚了很多。您是否大致知道开销有多大?
    • 你能帮我解决另一个问题吗?-stackoverflow.com/questions/69725512/…
    【解决方案2】:

    您可以使用pipeToSelf(参见https://doc.akka.io/docs/akka/current/typed/interaction-patterns.html#send-future-result-to-self)将请求的结果发送给参与者本身。与其尝试直接在 foo() 中获取 finalList 的值(这只能通过阻塞 get 实现),不如将 foo() 的结果发送给 actor 本身,在这种情况下,您可以像处理任何其他消息。最好为此创建特定的消息类型。

    您还应该看看CompletionStage 方法,最重要的是thenApply (https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html#thenApply-java.util.function.Function-) 可以转换结果,例如从finalList 创建一个Map,例如来自MapMapMessage。然后,您将像处理 actor 中的任何其他消息一样处理 MapMessage

    【讨论】:

    • 您好,感谢您的回答。最终我选择了接受的答案,因为它看起来更惯用。
    猜你喜欢
    • 1970-01-01
    • 2015-11-11
    • 2011-06-04
    • 1970-01-01
    • 2015-09-04
    • 2011-03-24
    • 1970-01-01
    • 2019-03-05
    • 1970-01-01
    相关资源
    最近更新 更多