【问题标题】:Akka synchronizing timestamped messages from several actorsAkka 同步来自多个参与者的带时间戳的消息
【发布时间】:2018-04-22 05:31:51
【问题描述】:

想象一下下面的架构。 akka 中有一个 actor 通过 websocket 接收推送消息。它们有一个时间戳,这些时间戳之间的间隔为 1 分钟。尽管具有相同时间戳的消息可以通过 websocket 多次到达。然后这个消息被广播给作为例子的另外三个演员(ma)。他们计算指标并将消息进一步推送给一个参与者(c)。

对于 ma,我定义了一个 TimeSeriesBuffer,仅当实体具有后续时间戳时才允许写入缓冲区。成功推送到缓冲区 ma 的发射指标后,该指标转到 cc 只有在拥有所有三个指标时才能更改其状态。因此,我定义了一个特征 Synchronizable,然后定义了一个具有“主从”架构的 SynchronizableTimeSeriesBuffer。

每次推送到每个缓冲区时都会触发检查,以了解所有三个 SynchronizableTimeSeriesBuffer 的缓冲区中是否有新元素具有相同的时间戳,可以作为单个进一步发送到 c消息。

以下是问题:

1) 解决方案是否过于复杂?

2) 在 scala 和 akka 方面有没有更好的方法?

3) 为什么当系统中的消息而不是“一个接一个”地从数据库中大量加载并馈送到系统以对指标进行回测时,它的速度不是那么快,也不是那么并行。 (其中一个缓冲区的填充速度比其他缓冲区快得多,而另一个缓冲区的长度为 0)。我假设它与 akka 关于调度/邮箱的设置有关。

我创建了一个关于代码的要点: https://gist.github.com/ifif14/18b5f85cd638af7023462227cd595a2f

我非常感谢社区在解决这个重要案件方面的帮助。

提前致谢

伊戈尔

【问题讨论】:

    标签: scala akka buffer actor


    【解决方案1】:

    非常感谢您的回答。切断的部分是我在 Synchronizable Trait 中也实现的。

      //clean up slaves. if their queue is behind masters latest element
      master_last_timestamp match {
        case Some(ts) => {
          slaves.foreach { s =>
            while ( s.queue.length > 0 && s.getElementTimestamp(s.queue.front) < ts ) {
              s.dequeue()
            }
            // val els = s.dequeueAll { queue_el => s.getElementTimestamp(queue_el) < ts }
          }
        }
        case _ => Unit
      }
    

    我开始实现缓冲区的原因是因为我觉得我会在系统中大量使用它,而且我不认为为我将使用的每个演员编写这部分。有一个蓝图来做这件事似乎更容易。

    但更重要的原因是,由于某种原因,一个缓冲区的填充速度要慢得多,或者根本不填充。虽然他们是由同一个演员填补的!! (只是不同的实例,计算时间应该几乎相同)然后在其他两个参与者发出所有从数据库“传递”的消息之后,第三个参与者开始接收它。我觉得这个演员只是没有得到处理器时间。所以我认为这是调度员的设置会影响这一点。你熟悉这个吗?

    此外,我希望调度程序的工作更像循环,给每个进程一点执行时间,但它最终只为有限数量的参与者提供服务,然后跳转到下一个参与者。尽管由于有广播公司,他们必须同时接收初始消息。

    我阅读了关于调度程序和邮箱的 akka 文档,但我仍然不明白如何去做。

    谢谢

    伊戈尔

    【讨论】:

      【解决方案2】:

      简化

      您的大部分架构似乎都旨在确保您的消息按时间顺序排列。为什么不在开头添加一个简单的Actor 来过滤掉重复的消息?那么系统的其余部分可能相对简单。

      举个例子;给定带有时间戳的消息

      type Payload = ???
      
      case class Message(timestamp : Long, payload : Payload)
      

      你可以写过滤器Actor:

      class FilterActor(ma : Iterable[ActorRef]) extends Actor {
      
        var currentMaxTime = 0L
      
        override def receive = {
          case m : Message if m.timestamp > currentMaxTime => ma foreach (_ ! m)
          case _ =>
        }     
      }
      

      现在您可以消除所有“TimeSeriesBuffer”和“Synchronizable”逻辑,因为您知道 ma 和 c 只会接收按时间排序的消息。

      批处理

      批处理不那么并发的可能原因是您的ma Actor 的邮箱正在被数据库查询填满,并且它正在执行的任何处理都比c 的处理慢。所以ma的邮箱继续积累消息,而c的邮箱相对空。

      【讨论】:

      • 我想评论你的答案,但太长了,所以我不得不自己写一个答案。如果你能看看就好了。谢谢
      • @IgorFialko 我并没有真正理解您在“答案”中所说的话。如果不完整地查看系统,将很难调试您的问题...
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-03-24
      • 2014-07-29
      • 1970-01-01
      • 2021-12-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多