【发布时间】:2019-06-25 21:34:36
【问题描述】:
我正在尝试使用 akka 流读取多个文件并将结果放入列表中。 我可以毫无问题地读取一个文件。返回类型是 Future[Seq[String]]。问题是处理 Future 中的序列必须进入 onComplete{}。
我正在尝试以下代码,但显然它不起作用。 onComplete 之外的列表 acc 是空的。但在 inComplete 中保存值。我理解这个问题,但我不知道如何解决这个问题。
// works fine
def readStream(path: String, date: String): Future[Seq[String]] = {
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)
val result: Future[Seq[String]] =
FileIO.fromPath(Paths.get(path + "transactions_" + date +
".data"))
.via(Framing.delimiter(ByteString("\n"), 256, true))
.map(_.utf8String)
.toMat(Sink.seq)(Keep.right)
.run()
var aa: List[scala.Array[String]] = Nil
result.onComplete(x => {
aa = x.get.map(line => line.split('|')).toList
})
result
}
//this won't work
def concatFiles(path : String, date : String, numberOfDays : Int) :
List[scala.Array[String]] = {
val formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
val formattedDate = LocalDate.parse(date, formatter);
var acc = List[scala.Array[String]]()
for( a <- 0 to numberOfDays){
val date = formattedDate.minusDays(a).toString().replace("-", "")
val transactions = readStream(path , date)
var result: List[scala.Array[String]] = Nil
transactions.onComplete(x => {
result = x.get.map(line => line.split('|')).toList
acc= acc ++ result })
}
acc}
【问题讨论】:
标签: scala akka akka-stream