【问题标题】:Gracefully stopping an Akka Stream优雅地停止 Akka 流
【发布时间】:2020-11-29 15:58:43
【问题描述】:

鉴于 akka 文档,我希望流在第 7 个/第 8 个元素之后停止。为什么不停止?它一直持续到最后一个元素(第 20 个)。

我想要实现的是,在系统终止时,流停止请求新元素并且系统等待终止,直到流中的所有元素都被完全处理(到达接收器)

object StreamKillSwitch extends App {

  implicit val system = ActorSystem(Behaviors.ignore, "sks")
  implicit val ec: ExecutionContext = system.executionContext

  val (killStream, done) =
    Source(1 to 20)
      .viaMat(KillSwitches.single)(Keep.right)
      .map(i => {
        system.log.info(s"Start task $i")
        Thread.sleep(100)
        system.log.info(s"End task $i")
        i
      })
      .toMat(Sink.foreach(println))(Keep.both)
      .run()

  CoordinatedShutdown(system)
    .addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") {
      () => Future(killStream.shutdown()).map(_ => Done)
    }

  CoordinatedShutdown(system)
    .addTask(CoordinatedShutdown.PhaseServiceRequestsDone, "wait-processing-complete") {
      () => done
    }

  Thread.sleep(720)

  system.terminate()
  Await.ready(system.whenTerminated, 5.seconds)
}

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    我能够“解决”问题,但我很难解释这种行为。

    由于某种原因,您对map(特别是Thread.sleep)的实现导致killStream.shutdown() 中的future/promise 回调传播速度不够快。我的猜测是它用阻塞的线程填充了主调度程序。

    您可以增加源元素的数量 Source(1 to 10000)Await 超时,以查看终止开关最终传播。

    但是,添加异步边界确实可以解决问题并获得预期的结果。

    将您的map 替换为以下mapAsync,它会按预期工作。

    .mapAsync(1) { i =>
      Future {
        system.log.info(s"Start task $i")
        Thread.sleep(100)
        system.log.info(s"Start task $i")
        i
      }
    }
    

    添加async 标记可以实现类似的结果。

    .map(i => {
      system.log.info(s"Start task $i")
      Thread.sleep(100)
      system.log.info(s"End task $i")
      i
    })
    .async
    

    【讨论】:

    • 我不确定是 Thread.sleep。只是 .map { i => system.log.info(s"Start task $i") } 仍然没有停止,并继续记录所有元素。 MapAsync 确实修复了它
    • 您报告了这个错误吗?如果没有,我可以做到
    • @AdrianS Thread.sleep 删除会有所帮助,但您需要将要处理的元素数量从 20 增加到更大的数量才能完成终止。
    • @AdrianS 我不确定这是否是一个错误。我认为这就是akka的设计方式。你不应该用 IO 或 Thread.sleep 阻塞主调度器
    • 在 Lightbend 论坛 discuss.lightbend.com/t/…987654321@ 上提出了问题
    猜你喜欢
    • 2018-03-01
    • 2010-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-09-27
    • 2019-11-10
    • 2010-12-15
    • 2018-09-08
    相关资源
    最近更新 更多