【问题标题】:Reading multiple files with akka streams in scala在scala中使用akka流读取多个文件
【发布时间】:2019-06-25 21:34:36
【问题描述】:

我正在尝试使用 akka 流读取多个文件并将结果放入列表中。 我可以毫无问题地读取一个文件。返回类型是 Future[Seq[String]]。问题是处理 Future 中的序列必须进入 onComplete{}。

我正在尝试以下代码,但显然它不起作用。 onComplete 之外的列表 acc 是空的。但在 inComplete 中保存值。我理解这个问题,但我不知道如何解决这个问题。

// works fine  
def readStream(path: String, date: String): Future[Seq[String]] = {
implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)

val result: Future[Seq[String]] =
  FileIO.fromPath(Paths.get(path + "transactions_" + date + 
".data"))
    .via(Framing.delimiter(ByteString("\n"), 256, true))
    .map(_.utf8String)
    .toMat(Sink.seq)(Keep.right)
    .run()
 var aa: List[scala.Array[String]] = Nil
 result.onComplete(x => {
  aa = x.get.map(line => line.split('|')).toList
})
 result
}

//this won't work  
def concatFiles(path : String, date : String, numberOfDays : Int) : 
List[scala.Array[String]] = {
val formatter = DateTimeFormatter.ofPattern("yyyyMMdd");
val formattedDate = LocalDate.parse(date, formatter);
var acc = List[scala.Array[String]]()

for( a <- 0 to numberOfDays){
  val date = formattedDate.minusDays(a).toString().replace("-", "")


  val transactions = readStream(path , date)
  var result: List[scala.Array[String]] = Nil
  transactions.onComplete(x => {
    result = x.get.map(line => line.split('|')).toList 
    acc=  acc ++ result })
}
acc}

【问题讨论】:

标签: scala akka akka-stream


【解决方案1】:

一般解决方案

给定一个Paths 值的迭代器,可以通过组合FileIOflatMapConcat 来创建文件行的Source

val lineSourceFromPaths : (() => Iterator[Path]) => Source[String, _] = pathsIterator =>
  Source
    .fromIterator(pathsIterator)
    .flatMapConcat { path =>
      FileIO
        .fromPath(path)
        .via(Framing.delimiter(ByteString("\n"), 256, true))
        .map(_.utf8String)
    }

问题应用

List 为空的原因是因为 Future 值尚未完成,因此在函数返回列表之前不会更新您的可变列表。

问题中的代码批评

问题中代码的组织和风格暗示了与akkaFuture 相关的一些误解。我认为您正在尝试一个相当复杂的工作流程,而不了解您尝试使用的工具的基础知识。

1.您不应该在每次调用函数时都创建ActorSystem。每个应用程序通常有 1 个 ActorSystem,并且只创建一次。

implicit val system = ActorSystem("Sys")
val settings = ActorMaterializerSettings(system)
implicit val materializer = ActorMaterializer(settings)

def readStream(...

2.您应该尽量避免可变集合,而是使用具有相应功能的Iterator

def concatFiles(path : String, date : String, numberOfDays : Int) : List[scala.Array[String]] = {

  val formattedDate = LocalDate.parse(date, DateTimeFormatter.ofPattern("yyyyMMdd"))

  val pathsIterator : () => Iterator[Path] = () => 
    Iterator
      .range(0, numberOfDays+1)
      .map(formattedDate.minusDays)
      .map(_.String().replace("-", "")
      .map(path => Paths.get(path + "transactions_" + date + ".data")

  lineSourceFromPaths(pathsIterator)

3.由于您正在处理 Futures,因此您不应等待 Futures 完成,而应将 concateFiles 的返回类型更改为 Future[List[Array[String]]]

【讨论】:

    猜你喜欢
    • 2016-10-14
    • 1970-01-01
    • 2017-12-08
    • 1970-01-01
    • 2019-02-02
    • 2017-03-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多