【发布时间】:2016-08-31 06:32:59
【问题描述】:
我想通过使用 akka Streams 的 ActorRefSource 来构建一个项目序列。所述源被连续馈送数据。计算完成后,Stream 会被 Poison Pill 终止。
下面的简化示例说明了我的意图:
val source = Source.actorRef[Int](1000, OverflowStrategy.fail)
.mapMaterializedValue{ ref =>
for(i <- 1 to 1000) {
ref ! i
}
ref ! PoisonPill
}
source.runWith(Sink.seq).foreach(s => println("count: "+s.size))
我期待 Stream 处理所有 1000 个元素,然后由于收到毒丸而终止。不幸的是,Stream 通常会更早终止。示例输出是:
count: 24
在发送毒丸之前等待一段时间,例如1000 毫秒会导致处理所有数字。
任何关于如何确保在收到毒丸之前所有物品都已处理的想法将不胜感激。
【问题讨论】:
标签: scala akka akka-stream