【发布时间】:2018-09-04 05:04:52
【问题描述】:
最近我开始使用 Akka,我正在使用它创建一个使用 Akka HTTP 上传文件的 REST API。该文件可以有数百万条记录,对于每条记录,我都需要执行一些验证和业务逻辑。我对演员建模的方式是,根演员接收文件流,将字节转换为字符串,然后按行分隔符拆分记录。完成此操作后,它将流(逐个记录)发送到另一个参与者进行处理,该参与者又根据某些分组将记录分发给其他参与者。要将蒸汽从主要根演员发送到演员进行处理,我正在使用Sink.actorRefWithAck。
这对于一个小文件来说工作得很好,但是对于一个大文件,我观察到的是,我得到了多个块并且第一个块正在被处理。如果我根据负载添加Thread.sleep 几秒钟,那么它正在处理整个文件。我想知道是否有任何方法可以知道处理参与者是否完全消耗了流,这样我就不必处理Thread.sleep。这是我用过的代码sn-p:
val AckMessage = DefaultFileUploadProcessActor.Ack
val receiver = context.system.actorOf(
Props(new DefaultFileUploadProcessActor(uuid, sourceId)(self, ackWith = AckMessage)))
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = DefaultFileUploadProcessActor.StreamInitialized
val OnCompleteMessage = DefaultFileUploadProcessActor.StreamCompleted
val onErrorMessage = (ex: Throwable) => DefaultFileUploadProcessActor.StreamFailure(ex)
val actorSink = Sink.actorRefWithAck(
receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage
)
val processStream =
fileStream
.map(byte => byte.utf8String.split(System.lineSeparator()))
.runWith(actorSink)
Thread.sleep(9000)
log.info(s"completed distribution of data to the actors")
sender() ! ActionPerformed(uuid, "Done")
对于我所采取的方法的任何专家建议将不胜感激。
【问题讨论】:
-
你可以使用 scala 的 Futures。您只需将文件的块发送给演员,并在该块的处理完成时将下一个发送给演员。使用 Future.onComplete 并在成功后进行询问电话。
-
嗨拉曼,感谢您对此的回复。当我在流程中使用 Sink.actorRefWithAck 时,我无法控制将单个块发送到处理参与者。你是说没有 Sink.actorRefWithAck 还是有办法用 Sink.actorRefWithAck 控制单个块
-
我是说在没有 Sink 的情况下执行此操作我的意思是您可以使用询问调用并行执行此操作,如果块的顺序无关紧要,那么您将必须按顺序执行,即在处理完一个块后发送下一个块。
标签: scala akka akka-stream akka-http