【问题标题】:Ask vs Tell or forward for Actors using Akka StreamsAsk vs Tell or forward for Actors using Akka Streams
【发布时间】:2019-04-19 09:03:35
【问题描述】:

您好,我正在与akka streamsakka-stream-kafka 一起工作。我正在使用以下设置设置流:

Source (Kafka) --> | Akka Actor Flow | --> Sink (MongoDB)

Actor Flow 基本上由将处理数据的Actors,下面是层次结构:

                                      System
                                         | 
                                     Master Actor  
                                      /       \
                          URLTypeHandler     SerializedTypeHandler
                             /       \                   |
                     Type1Handler   Type2Handler     SomeOtherHandler

所以Kafka有消息,我写了消费者并在atMostOnceSource配置中运行它并使用

Consumer.Control control =
            Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics(TOPIC))
                    .mapAsyncUnordered(10, record -> processAccessLog(rootHandler, record.value()))
                    .to(Sink.foreach(it -> System.out.println("FinalReturnedString--> " + it)))
                    .run(materializer);

我最初使用打印作为接收器,只是为了让流程运行。

processAccessLog 定义为:

private static CompletionStage<String> processAccessLog(ActorRef handler, byte[] value) {

    handler.tell(value, ActorRef.noSender());

    return CompletableFuture.completedFuture("");
}

现在,从定义ask 必须在演员期待响应时使用,在这种情况下是有意义的,因为我想返回要写入接收器的值。

但是每个人(包括文档),提到避免ask 而使用tellforward,一个惊人的博客写在上面Don't Ask, Tell

在他提到的博客中,在嵌套actors的情况下,使用tell作为第一条消息,然后使用forward作为消息到达目的地,然后在处理后直接将消息发送回根actor。

现在问题来了,

  1. 如何将消息从 D 发送回 A,这样我仍然可以使用接收器。
  2. 开放式流是一种好的做法吗?例如Sink 无关紧要的流,因为演员已经完成了这项工作。 (我认为不建议这样做,似乎有缺陷)。

【问题讨论】:

    标签: java akka akka-stream


    【解决方案1】:

    ask 仍然是正确的模式

    从链接的博客文章中,ask 的一个“缺点”是:

    阻塞一个actor本身,它不能选择任何新消息,直到 响应到达,处理结束。

    但是,在akka-stream 中,这正是我们正在寻找的功能,也就是“背压”。如果FlowSink 处理数据需要很长时间,那么我们希望Source 放慢速度。

    作为旁注,我认为在博客文章中声称额外的侦听器 Actor 导致实现“重几十倍”是夸大其词。显然,中间 Actor 会增加一些延迟开销,但不会增加 12x 更多。

    消除背压

    您正在寻找的任何实施都将有效地消除背压。仅使用tell 的中间流将不断地将需求传播回源,无论处理程序 Actors 中的处理逻辑是否以与源生成数据相同的速度完成其计算。 考虑一个极端的例子:如果你的 Source 每秒可以产生 100 万条消息,但通过 tell 接收这些消息的 Actor 每秒只能处理 1 条消息。那个 Actor 的邮箱会发生什么?

    using the ask pattern in an intermediate Flow 有意将处理程序的速度与源生成数据的速度联系起来。

    如果您愿意移除从 Sink 到 Source 的背压信号,那么您最好不要一开始就使用 akka-stream。您可以使用背压或非阻塞消息传递,但不能同时使用。

    【讨论】:

    • 感谢您的精彩解释,我最终做的是使用.mapAsyncUnordered(10, record -&gt; ask(ActorA, record.value() 处理我的流,然后是来自ActorA -&gt; ActorB -&gt; ActorCforward 和ActorC 使用getSender().tell(myResponse, ActorRef.noSender()); 回复流。然后将Sink 用于所有向外的数据存储应该仍然传播背压,对吧? (据我了解)
    • @iam.Carrot 欢迎您。只要涉及到询问,那么是的,就会传播背压。
    • 只是一件快速的事情,如果您有一个完整的演员系统来处理Sink,那么在Sink 上执行ActorRef.ask() 是否相关?或者即使我们有tell,它是否会提供背压
    【解决方案2】:

    Ramon J Romero y Vigil 是对的,但我会尝试扩大回应。

    1) 我认为“不问,不说”的教条主要适用于 Actor 系统架构。在这里您需要返回一个 Future 以便流可以解析处理后的结果,您有两种选择:

    • 使用询问
    • 为每个事件创建一个actor并传递给它们Promise,这样当这个actor接收到数据时Future就会完成(你可以使用getSender方法,这样D可以将响应发送给A)。无法在消息中发送 Promise 或 Future(不可序列化),因此无法避免创建这种短暂的演员。

    最后你做的大多是一样的......

    2) 使用空的 Sink 来完成流是完全可以的(事实上,akka 提供了Sink.ignore() 方法来做到这一点)。

    似乎您错过了使用流的原因,它们是很酷的抽象,可提供可组合性、并发性和背压。另一方面,演员不能作曲,难以应对背压。如果您不需要此功能并且您的演员可以轻松完成工作,那么您首先不应该使用 akka-streams。

    【讨论】:

    • 非常感谢您的回答,尤其是第二点,因为我很担心。 + 1
    猜你喜欢
    • 2017-03-20
    • 1970-01-01
    • 1970-01-01
    • 2018-01-19
    • 1970-01-01
    • 2019-04-13
    • 1970-01-01
    • 2012-04-24
    • 1970-01-01
    相关资源
    最近更新 更多