【问题标题】:Right design in akka. - Message deliveryakka 中的正确设计。 - 消息传递
【发布时间】:2013-05-29 10:56:54
【问题描述】:

我浏览了一些关于 akka 如何以及为什么不保证消息传递的帖子。 documentationdiscussion 和其他群里的讨论都很好解释。

我对 akka 还很陌生,想知道一个箱子的合适设计。例如,假设我有 3 个不同的演员都在不同的机器上。一个负责烹饪书,另一个负责历史,最后一个负责技术书籍。

我在另一台机器上有一个主角。假设如果我们有一些可用的书,那么有一个要搜索的主要演员的查询。主要参与者向 3 个远程参与者发送请求,并期待结果。所以我这样做:

  val scatter = system.actorOf(
        Props[SearchActor].withRouter(ScatterGatherFirstCompletedRouter(
              routees=someRoutees, within = 10 seconds)), "router")
  implicit val timeout = Timeout(10 seconds)
  val futureResult = scatter ?  Text("Concurrency in Practice")
  //      What should I do here?.
  //val result = Await.result(futureResult, timeout.duration) line(a)

简而言之,我已经向所有 3 个远程参与者发送了请求,并期望在 10 秒内得到结果。

应该采取什么行动?

  1. 假设我在 10 秒内没有得到结果,我是否应该再次向所有人发送新请求?
  2. 如果within 以上时间为时过早怎么办。但我不知道可能需要多长时间。
  3. 如果within 时间足够但消息被丢弃了怎么办。

如果我在within 时间内没有得到响应,请再次重新发送请求。像这样,它保持异步:

futureResult onComplete{
  case Success(i) => println("Result "+i)
  case Failure(e) => //send again
}

但是在查询太多的情况下,会不会调用线程太多,体积庞大?如果我取消注释line(a),它会变得同步并且在负载下可能会表现不佳。

假设我在 10 秒内没有得到响应。如果within 时间还为时过早,那么它会再次发生大量无用的计算。如果消息被丢弃,那么 10 几秒钟的宝贵时间就被浪费了。以防万一,假设我知道消息已送达,我可能会等待更长的时间而不会怀疑。

人们如何解决这些问题? ACK?但是我必须将状态存储在所有查询的参与者中。这一定很常见,我正在寻找合适的设计。

【问题讨论】:

    标签: java scala akka


    【解决方案1】:

    我将尝试为您回答其中的一些问题。我不会对所有事情都有具体的答案,但希望我能引导你朝着正确的方向前进。

    首先,您需要更改将请求传达给进行图书搜索的 3 个参与者的方式。在这里使用ScatterGatherFirstCompletedRouter 可能不是正确的方法。该路由器只会等待其中一个路由(第一个响应)的回答,因此您的结果集将不完整,因为它不包含来自其他 2 个路由的结果。还有一个BroadcastRouter,但这也不符合您的需求,因为它只处理tell (!) 而不是ask (?)。要执行您想做的事情,一种选择是将请求发送给每个接收者,获取 Futures 以获取响应,然后使用 Future.sequence 将它们组合成聚合 Future。一个简化的示例可能如下所示:

    case class SearchBooks(title:String)
    case class Book(id:Long, title:String)
    
    class BookSearcher extends Actor{
    
      def receive = {
        case req:SearchBooks =>
          val routees:List[ActorRef] = ...//Lookup routees here
          implicit val timeout = Timeout(10 seconds)
          implicit val ec = context.system.dispatcher
    
          val futures = routees.map(routee => (routee ? req).mapTo[List[Book]])
          val fut = Future.sequence(futures)
    
          val caller = sender //Important to not close over sender
          fut onComplete{
            case Success(books) => caller ! books.flatten
    
            case Failure(ex) => caller ! Status.Failure(ex)
          }
      }
    }
    

    现在这不会是我们的最终代码,但它是您的示例尝试执行的近似值。在这个例子中,如果任何一个下游路由失败/超时,我们将命中我们的Failure 块,调用者也会失败。如果它们都成功,调用者将获得Book 对象的聚合列表。

    现在回答您的问题。首先,如果您在超时时间内没有从其中一个路由中得到答案,您会询问是否应该再次向所有参与者发送请求。这个问题的答案真的取决于你。您会允许另一端的用户看到部分结果(即 3 个参与者中的 2 个的结果),还是每次都必须是完整的结果集?如果答案是肯定的,您可以将发送到路由的代码调整为如下所示:

    val futures = routees.map(routee => (routee ? req).mapTo[List[Book]].recover{
      case ex =>
        //probably log something here
        List()
    })
    

    使用此代码,如果任何路由因任何原因超时或失败,则会用一个空的“Book”列表代替响应而不是失败。现在,如果您不能忍受部分结果,那么您可以重新发送整个请求,但您必须记住,另一端可能有人在等待他们的图书结果,他们不想永远等待。

    对于第二个问题,您问如果您的超时时间过早怎么办?您选择的超时值将完全取决于您,但它很可能应该基于两个因素。第一个因素将来自测试搜索的调用时间。找出平均需要多长时间,并在此基础上选择一个值,并稍加缓冲以确保安全。第二个因素是另一端的人愿意等待他们的结果多长时间。您可以在超时时非常保守,为了安全起见将其设置为 60 秒,但如果另一端确实有人在等待结果,他们愿意等待多长时间?我宁愿得到一个失败响应,表明我应该再试一次,而不是永远等待。因此,将这两个因素考虑在内,您应该选择一个值,该值可以让您在非常高的时间内获得响应,同时又不会让另一端的调用者等待太久。

    对于问题 3,您询问如果消息被丢弃会发生什么。在这种情况下,我猜测接收该消息的人的未来将超时,因为它不会得到响应,因为接收者参与者永远不会收到要响应的消息。 Akka 不是 JMS;它没有确认模式,如果收件人没有收到并确认消息,则可以多次重发消息。

    另外,从我的示例中可以看出,我同意不使用 Await 阻止聚合 Future。我更喜欢使用非阻塞回调。在接收函数中阻塞并不理想,因为 Actor 实例将停止处理其邮箱,直到阻塞操作完成。通过使用非阻塞回调,您可以释放该实例以返回处理其邮箱,并允许处理结果只是在ExecutionContext 中执行的另一个作业,与处理其邮箱的参与者分离。

    现在,如果您真的不想在网络不可靠时浪费通信,您可以查看 Akka 2.2 中提供的Reliable Proxy。如果你不想走这条路,你可以通过定期向路由发送ping 类型的消息来自己滚动。如果没有及时响应,则将其标记为已关闭并且不要向其发送消息,直到您可以从它那里获得可靠的(在很短的时间内)ping,有点像每个路由的 FSM。如果您绝对需要此行为,则其中任何一个都可以工作,但您需要记住,这些解决方案会增加复杂性,并且只有在您绝对需要此行为时才应使用。如果您正在开发银行软件并且您绝对需要有保证的交付语义,否则会导致不良的财务影响,请务必采用这种方法。在决定是否需要这样的东西时要明智,因为我打赌 90% 的时间你不需要。在您的模型中,唯一可能因等待您可能已经知道不会成功的事情而受到影响的人是另一端的呼叫者。通过在actor中使用非阻塞回调,它不会因为某些事情可能需要很长时间而停止;它已经移入下一条消息。如果您决定在失败时重新提交,您也需要小心。您不想淹没接收演员的邮箱。如果您决定重新发送,请将其设置为固定的次数。

    如果您需要这些有保证的语义,另一种可能的方法可能是查看 Akka 的 Clustering Model。如果您对下游路由进行了集群化,并且其中一台服务器出现故障,那么所有流量都将被路由到仍在运行的节点,直到其他节点恢复为止。

    【讨论】:

    • 感谢您的详细解答。这值得我的一些赏金:)。另外,您能否回答我在 3 个问题之后提到的其他问题。考虑到可能会有一些消息下拉列表,我正在尝试解决这种行为。
    • 添加了更多信息,但仍不确定我 100% 回答了您的问题。希望它有所帮助。
    • 好的。所以总结一下,我应该避免考虑消息下拉菜单,而是关注within 时间。如果超过within时间,则可以认为是消息丢弃(在大多数情况下),可以采取后续措施。唯一的问题可能是within 时间很大(比如图像处理任务),在这种情况下我可以使用其他替代方案,或者说有ack。实践中的一般性问题:在连接良好的地方,消息多久掉线一次?
    • 我同意你的第一句话;只有在情况确实需要时才采取更极端的措施,对我来说,搜索操作(读取操作)并不能真正保证在其他情况下可能会这样做。如果你有一个良好稳定的连接,我会想象一个分数下降率,远低于 1%。
    • @LaloInDublin,在Actor 上,sender 是一个可变的ActorRef var,表示谁是正在处理的当前消息的发件人。由于onComplete 回调是异步的,这意味着sender 可能会在您更改以将响应发送回它之前从您下方更改。在更专业的术语中,这称为关闭可变状态,在这种情况下应该避免。在进入回调之前将其捕获在单独的 val 中,然后使用该 val 来确保您可以将响应路由回原始发件人。
    猜你喜欢
    • 1970-01-01
    • 2015-08-05
    • 1970-01-01
    • 1970-01-01
    • 2011-09-06
    • 2021-06-19
    • 1970-01-01
    • 2018-02-16
    • 1970-01-01
    相关资源
    最近更新 更多