【问题标题】:How to wait for file upload stream to complete in Akka actor如何在 Akka actor 中等待文件上传流完成
【发布时间】:2018-09-04 05:04:52
【问题描述】:

最近我开始使用 Akka,我正在使用它创建一个使用 Akka HTTP 上传文件的 REST API。该文件可以有数百万条记录,对于每条记录,我都需要执行一些验证和业务逻辑。我对演员建模的方式是,根演员接收文件流,将字节转换为字符串,然后按行分隔符拆分记录。完成此操作后,它将流(逐个记录)发送到另一个参与者进行处理,该参与者又根据某些分组将记录分发给其他参与者。要将蒸汽从主要根演员发送到演员进行处理,我正在使用Sink.actorRefWithAck

这对于一个小文件来说工作得很好,但是对于一个大文件,我观察到的是,我得到了多个块并且第一个块正在被处理。如果我根据负载添加Thread.sleep 几秒钟,那么它正在处理整个文件。我想知道是否有任何方法可以知道处理参与者是否完全消耗了流,这样我就不必处理Thread.sleep。这是我用过的代码sn-p:

val AckMessage = DefaultFileUploadProcessActor.Ack
val receiver = context.system.actorOf(
  Props(new DefaultFileUploadProcessActor(uuid, sourceId)(self, ackWith = AckMessage)))
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = DefaultFileUploadProcessActor.StreamInitialized
val OnCompleteMessage = DefaultFileUploadProcessActor.StreamCompleted
val onErrorMessage = (ex: Throwable) => DefaultFileUploadProcessActor.StreamFailure(ex)

val actorSink = Sink.actorRefWithAck(
  receiver,
  onInitMessage = InitMessage,
  ackMessage = AckMessage,
  onCompleteMessage = OnCompleteMessage,
  onFailureMessage = onErrorMessage
)

val processStream =
  fileStream
    .map(byte => byte.utf8String.split(System.lineSeparator()))
    .runWith(actorSink)

Thread.sleep(9000)
log.info(s"completed distribution of data to the actors")
sender() ! ActionPerformed(uuid, "Done")

对于我所采取的方法的任何专家建议将不胜感激。

【问题讨论】:

  • 你可以使用 scala 的 Futures。您只需将文件的块发送给演员,并在该块的处理完成时将下一个发送给演员。使用 Future.onComplete 并在成功后进行询问电话。
  • 嗨拉曼,感谢您对此的回复。当我在流程中使用 Sink.actorRefWithAck 时,我无法控制将单个块发送到处理参与者。你是说没有 Sink.actorRefWithAck 还是有办法用 Sink.actorRefWithAck 控制单个块
  • 我是说在没有 Sink 的情况下执行此操作我的意思是您可以使用询问调用并行执行此操作,如果块的顺序无关紧要,那么您将必须按顺序执行,即在处理完一个块后发送下一个块。

标签: scala akka akka-stream akka-http


【解决方案1】:

如果您的 Source 只有一个文件,您可以通过等待从 runWith 方法返回的 Future 来等待流完成。

如果你有多个文件的来源,你应该这样写:

filesSource
  .mapAsync(1)(data => (receiver ? data).mapTo[ProcessingResult])
  .mapAsync(1)(processingResult => (resultListener ? processingResult).mapTo[ListenerResponse])
  .runWith(Sink.ignore)

【讨论】:

  • 感谢您的回复。我只有一个来源。当我在示例中使用 runWith(actorSink) 时,它不会返回等待未来。当我使用 Sink.ignore 而不是使用 Sink.actorRefWithAck 创建的 actorSink 时,我正在等待未来。当我使用 Sink.actorRefWithAck 时,我没有选择使用“?”发送消息明确地。我错过了什么吗?
  • Jeffrey Chung 已经写出了正确的答案。关键是通过流传递正确的物化价值。或者你可以为接收者添加一些 OnCompleteMessage 事件的监听器注册逻辑。
【解决方案2】:

假设fileStreamSource[ByteString, Future[IOResult],一个想法是保留源的具体化值,然后在这个具体化值完成后回复sender()

val processStream: Future[IOResult] =
  fileStream
    .map(_.utf8String.split(System.lineSeparator()))
    .to(actorSink)
    .run()

processStream.onComplete {
  case Success(_) =>
    log.info("completed distribution of data to the actors")
    sender() ! ActionPerformed(uuid, "Done")
  case Failure(t) =>
    // ...
}

上述方法可确保在通知发送者之前消耗整个文件。

请注意,Akka Streams 有一个 Framing 对象,可以解析来自 ByteString 流的行:

val processStream: Future[IOResult] =
  fileStream
    .via(Framing.delimiter(
      ByteString(System.lineSeparator()),
      maximumFrameLenght = 256,
      allowTruncation = true))
    .map(_.ut8String)
    .to(actorSink) // the actor will have to expect String, not Array[String], messages
    .run()

【讨论】:

  • 感谢 Jeffrey Chung,fileStream .map(_.utf8String.split(System.lineSeparator())) .to(actorSink) .run() - 这是返回 IOResult,而不是 Future[IOResult]。难道我做错了什么?与框架对象相关的另一件事是,它期望最大帧长度。在我的文件中,行是动态的。在某些情况下,我的角色很少,而在某些情况下,我的角色更多。最初我尝试使用 256,然后我不得不根据文件中的数据进行调整。因此,我想采用另一种方法。与此相关的任何最佳实践
【解决方案3】:

receiveractor 将在流成功完成或失败时收到OnCompleteMessageonErrorMessage,因此您应该在接收器DefaultFileUploadProcessActoractor 的receive 块中处理这些消息。

【讨论】:

    猜你喜欢
    • 2021-02-12
    • 1970-01-01
    • 2017-07-11
    • 2013-02-06
    • 2016-09-10
    • 2020-06-29
    • 2018-04-08
    • 2020-02-12
    • 1970-01-01
    相关资源
    最近更新 更多