【问题标题】:Akka Streams File Handling and TerminationAkka Streams 文件处理和终止
【发布时间】:2018-01-29 16:51:10
【问题描述】:

我有以下 sn-p,它读取一个 CSV 文件并将一些内容打印到控制台:

def readUsingAkkaStreams = {

    import java.io.File
    import akka.stream.scaladsl._
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import java.security.MessageDigest


    implicit val system = ActorSystem("Sys")
    implicit val materializer = ActorMaterializer()

    val file = new File("/path/to/csv/file.csv")

    val fileSource = FileIO.fromFile(file, 65536)

    val flow = fileSource.map(chunk => chunk.utf8String)

    flow.to(Sink.foreach(println(_))).run
  }

我现在对此有一些疑问:

  1. 块大小是以字节为单位的大小。内部如何处理?我的意思是我最终会遇到一个块可能只包含一行中的部分元素的情况吗?

  2. 此流如何终止?现在没有!我想让它知道它已经完全读取了文件并且应该触发停止信号!有没有一种机制可以做到这一点?

编辑 1:根据下面帖子的建议,我收到如屏幕截图所示的错误消息!

编辑 2:

通过设置 maximumFrameLength 以匹配最大块大小的大小(即 65536)来消除错误。

val file = new File("/path/to/csf/file.csv")
val chunkSize = 65536
val fileSource = FileIO.fromFile(file, chunkSize).via(Framing.delimiter(
  ByteString("\n"),
  maximumFrameLength = chunkSize,
  allowTruncation = true))

【问题讨论】:

标签: scala akka-stream


【解决方案1】:

1.根据docs:

发出的元素是 chunkSize 大小的 ByteString 元素,最终元素除外,它的大小将最大为 chunkSize。

FileIO 源将换行视为任何其他字符。所以是的,您可能会在一个块中看到 CSV 行的第一部分,而在另一个块中看到第二部分。如果这不是您想要的,您可以使用Framing.delimiter 重组ByteString 流的分块方式(有关更多信息,请参阅docs)。

附带说明,FileIO.fromFile 已被弃用,最好使用FileIO.fromPath

一个例子是:

val fileSource = FileIO.fromPath(...)
  .via(Framing.delimiter(
    ByteString("\n"),
    maximumFrameLength = 256,
    allowTruncation = true))

2. 接收器具体化为Future,当流终止时,您可以映射到做某事:

val result: Future[IOResult] = flow.runWith(Sink.foreach(println(_)))

result.onComplete(...)

【讨论】:

  • 流没有终止。此外,您的解决方案意味着存在换行符,但如果换行符位于任意位置怎么办?看看我编辑的问题。
  • 我修改了 maximumFrameLength 以匹配最大块大小的大小,即 65536。这显然消除了错误,但不确定这是否正确。
  • 所以要终止流,在上面的示例中,我必须执行 System.exit(0),这不是我想要的
  • 另外,我可以使用 ActorSystem Terminate 终止流,但这也是我不想做的事情!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2023-04-03
  • 2016-10-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-27
  • 1970-01-01
相关资源
最近更新 更多