【问题标题】:This Akka stream sometimes doesn't finish此 Akka 流有时无法完成
【发布时间】:2018-10-26 21:00:03
【问题描述】:

我有一个图表,它从多个 gzip 文件中读取行并将这些行写入另一组 gzip 文件,根据每行中的某个值进行映射。

它适用于小型数据集,但无法终止大型数据。 (这可能不是数据大小的问题,因为我没有运行足够的次数来确定——这需要一段时间)。

def files: Source[File, NotUsed] =
  Source.fromIterator(
    () =>
      Files
        .fileTraverser()
        .breadthFirst(inDir)
        .asScala
        .filter(_.getName.endsWith(".gz"))
        .toIterator)

def extract =
  Flow[File]
    .mapConcat[String](unzip)
    .mapConcat(s =>
      (JsonMethods.parse(s) \ "tk").extract[Array[String]].map(_ -> s).to[collection.immutable.Iterable])
    .groupBy(1 << 16, _._1)
    .groupedWithin(1000, 1.second)
    .map { lines =>
      val w = writer(lines.head._1)
      w.println(lines.map(_._2).mkString("\n"))
      w.close()
      Done
    }
    .mergeSubstreams

def unzip(f: File) = {
  scala.io.Source
    .fromInputStream(new GZIPInputStream(new FileInputStream(f)))
    .getLines
    .toIterable
    .to[collection.immutable.Iterable]
}

def writer(tk: String): PrintWriter =
  new PrintWriter(
    new OutputStreamWriter(
      new GZIPOutputStream(
        new FileOutputStream(new File(outDir, s"$tk.json.gz"), true)
      ))
  )

val process = files.via(extract).toMat(Sink.ignore)(Keep.right).run()

Await.result(process, Duration.Inf)

线程转储显示进程是WAITINGAwait.result(process, Duration.Inf),没有其他任何事情发生。

OpenJDK v11 和 Akka v2.5.15

【问题讨论】:

    标签: scala akka akka-stream


    【解决方案1】:

    很可能它卡在groupBy 中,因为它用完了调度程序中的可用线程,无法将所有来源的项目收集到 2^16 个组中。

    所以,如果我是你,我可能会使用 statefulMapConcat 和可变 Map[KeyType, List[String]] 半手动地在 extract 中实现分组。或者先用groupedWithin 缓冲行,然后将它们分成组,您将写入Sink.foreach 中的不同文件。

    【讨论】:

    • 确实,我已经尝试过这种模式几次了。它不仅经常挂起,而且速度也很慢。我得出了和你一样的结论,并用缓冲方法重新实现了。
    猜你喜欢
    • 1970-01-01
    • 2018-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-02-12
    • 1970-01-01
    • 2018-12-24
    • 1970-01-01
    相关资源
    最近更新 更多