【发布时间】:2020-11-29 15:58:43
【问题描述】:
鉴于 akka 文档,我希望流在第 7 个/第 8 个元素之后停止。为什么不停止?它一直持续到最后一个元素(第 20 个)。
我想要实现的是,在系统终止时,流停止请求新元素并且系统等待终止,直到流中的所有元素都被完全处理(到达接收器)
object StreamKillSwitch extends App {
implicit val system = ActorSystem(Behaviors.ignore, "sks")
implicit val ec: ExecutionContext = system.executionContext
val (killStream, done) =
Source(1 to 20)
.viaMat(KillSwitches.single)(Keep.right)
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.toMat(Sink.foreach(println))(Keep.both)
.run()
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
() => Future(killStream.shutdown()).map(_ => Done)
}
CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
() => done
}
Thread.sleep(720)
system.terminate()
Await.ready(system.whenTerminated, 5.seconds)
}
【问题讨论】:
标签: scala akka akka-stream