【问题标题】:How to implement the lifecycle PostStop?如何实现生命周期 PostStop?
【发布时间】:2019-06-27 20:12:56
【问题描述】:

我有一个演员,其实现如下所示:

  def create(fsm: ActorRef[ServerHealth], cancel: Option[Cancellable]): Behavior[ServerHealthStreamer] =
    Behaviors.setup { context =>
      implicit val system = context.system
      implicit val materializer = ActorMaterializer()
      implicit val dispatcher = materializer.executionContext

      val kafkaServer = system
        .settings
        .config
        .getConfig("kafka")
        .getString("servers")

      val sink: Sink[ServerHealth, NotUsed] = ActorSink.actorRefWithAck[ServerHealth, ServerHealthStreamer, Ack](
        ref = context.self,
        onCompleteMessage = Complete,
        onFailureMessage = Fail.apply,
        messageAdapter = Message.apply,
        onInitMessage = Init.apply,
        ackMessage = Ack)

      val cancel = Source.tick(1.seconds, 15.seconds, NotUsed)
        .flatMapConcat(_ => Source.fromFuture(health(kafkaServer)))
        .map {
          case true =>
            KafkaActive
          case false =>
            KafkaInactive
        }
        .to(sink)
        .run()

      Behaviors.receiveMessage {
        case Init(ackTo) =>
          ackTo ! Ack
          Behaviors.same
        case Message(ackTo, msg) =>
          fsm ! msg
          ackTo ! Ack
          create(fsm, Some(cancel))
        case Complete =>
          Behaviors.same
        case Fail(_) =>
          fsm ! KafkaInactive
          Behaviors.same
      }
    }

如您所见,我使用的是 akka typed,我对实现 PostStop 生命周期有疑问。

如何在akka typed 中实现生命周期钩子?

我要存档的是,当actor收到消息PostStop时,流将被取消。

【问题讨论】:

    标签: scala akka akka-typed


    【解决方案1】:

    Akka Typed 中的参与者被通知 actor lifecycle events 作为“信号”,在本例中为 PostStop 信号。

    【讨论】:

      猜你喜欢
      • 2023-04-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-12-18
      • 1970-01-01
      • 2020-11-05
      相关资源
      最近更新 更多