【问题标题】:Reading multiple Files Asynchronously using Akka Streams, Scala使用 Akka Streams、Scala 异步读取多个文件
【发布时间】:2017-08-18 07:16:29
【问题描述】:

我想异步读取文件夹中的许多 .CSV 文件并返回自定义案例类的 Iterable。

我可以使用 Akka Streams 和 How 来实现这一点吗?

*我试图根据文档以某种方式平衡工作,但管理起来有点困难......

或者

改用Actor是个好习惯吗?(父Actor有子,每个子读取一个文件,返回一个Iterable给父,然后父合并所有的Iterable?)

【问题讨论】:

  • 问题不是很清楚。 1. 您想为所有 CSV 文件返回一个自定义案例类的单个 Iterable,还是为每个 csv 文件返回一个? 2. 如果有数千个文件,你是想同时读取它们,还是只需要某种程度的并行性?

标签: scala file asynchronous stream akka


【解决方案1】:

与@paul 答案基本相同,但有一些小改进

def files = new java.io.File("").listFiles().map(_.getAbsolutePath).to[scala.collection.immutable.Iterable]

Source(files).flatMapConcat( filename => //you could use flatMapMerge if you don't bother about line ordering
    FileIO.fromPath(Paths.get(filename))
      .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true).map(_.utf8String))
  ).map { csvLine =>
    // parse csv here
    println(csvLine)
  }

【讨论】:

    【解决方案2】:

    首先,您需要阅读/了解 Akka 流的工作原理,包括 Source、Flow 和 Sink。然后就可以开始学习算子了。

    要并行执行多个操作,您可以使用运算符 mapAsync 在其中指定并行数。

      /**
        * Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will
        * be determine by the argument passed to the operator.
        */
      @Test def readAsync(): Unit = {
        Source(0 to 10)//-->Your files
          .mapAsync(5) { value => //-> It will run in parallel 5 reads
            implicit val ec: ExecutionContext = ActorSystem().dispatcher
            Future {
              //Here read your file
              Thread.sleep(500)
              println(s"Process in Thread:${Thread.currentThread().getName}")
              value
            }
          }
          .runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}")))
      }
    

    您可以在此处了解有关 akka 和 akka 流的更多信息https://github.com/politrons/Akka

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-08-07
      • 2021-09-08
      • 2016-10-14
      • 1970-01-01
      • 1970-01-01
      • 2016-11-17
      • 2016-06-01
      相关资源
      最近更新 更多