【发布时间】:2018-04-22 05:31:51
【问题描述】:
想象一下下面的架构。 akka 中有一个 actor 通过 websocket 接收推送消息。它们有一个时间戳,这些时间戳之间的间隔为 1 分钟。尽管具有相同时间戳的消息可以通过 websocket 多次到达。然后这个消息被广播给作为例子的另外三个演员(ma)。他们计算指标并将消息进一步推送给一个参与者(c)。
对于 ma,我定义了一个 TimeSeriesBuffer,仅当实体具有后续时间戳时才允许写入缓冲区。成功推送到缓冲区 ma 的发射指标后,该指标转到 c。 c 只有在拥有所有三个指标时才能更改其状态。因此,我定义了一个特征 Synchronizable,然后定义了一个具有“主从”架构的 SynchronizableTimeSeriesBuffer。
每次推送到每个缓冲区时都会触发检查,以了解所有三个 SynchronizableTimeSeriesBuffer 的缓冲区中是否有新元素具有相同的时间戳,可以作为单个进一步发送到 c消息。
以下是问题:
1) 解决方案是否过于复杂?
2) 在 scala 和 akka 方面有没有更好的方法?
3) 为什么当系统中的消息而不是“一个接一个”地从数据库中大量加载并馈送到系统以对指标进行回测时,它的速度不是那么快,也不是那么并行。 (其中一个缓冲区的填充速度比其他缓冲区快得多,而另一个缓冲区的长度为 0)。我假设它与 akka 关于调度/邮箱的设置有关。
我创建了一个关于代码的要点: https://gist.github.com/ifif14/18b5f85cd638af7023462227cd595a2f
我非常感谢社区在解决这个重要案件方面的帮助。
提前致谢
伊戈尔
【问题讨论】: