【问题标题】:AKKA FileIO Stream Parsing to Newline & EOFAKKA FileIO 流解析为换行符和 EOF
【发布时间】:2017-05-15 17:09:20
【问题描述】:

我正在使用 Akka FileIO(在 scala 中)创建一个文件解析器,该解析器旨在从输入文件中读取每一行并应用一个简单的接收器。每一行都用换行符 ('\n') 分隔,但文件中的最后一行以 EOF 结尾。

如何处理换行符和 eof 定界,以便我可以可靠地读取最后一行,而不必依赖最终的 '/n' 字符?

    var rowNum = 0
    val simpleMsgSink: Sink[String, Future[Done]] =
      Sink.foreach {
        case row: String => {
          println(s"$rowNum: $row")
          rowNum = rowNum+1
        }
      }
    val source = FileIO.fromPath(file, 1 * 1024 * 1024 )
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
      .map(_.utf8String)
      .runWith(simpleMsgSink)

如果这是针对文件执行的(最后一行末尾没有换行符):

Sensor_ID,Location,Seqno,gwrx.time,Temp,Humidity,Noise,CO2,Water
A0890,"51.645368, 0.072211",1,42793.00278,16,48,36,325,0
A0891,"51.645370, 0.072300",1,42793.00278,15,41,34,353,3

输出是:

0: Sensor_ID,Location,Seqno,gwrx.time,Temp,Humidity,Noise,CO2,Water
1: A0890,"51.645368, 0.072211",1,42793.00278,16,48,36,325,0

我怎样才能提取最后一行?

【问题讨论】:

  • 感谢您的测试,@chunjef。我也在运行 2.4.16。 sn-p 是否返回 3 行输出?您确定没有在第 3 行末尾添加额外的 \n 吗?

标签: scala parsing akka newline eof


【解决方案1】:

如果你看一下Framing.delimiterscala doc,你会发现它实际上有第三个参数:allowTruncation,默认值为false。以下是 scaladoc 所说的:

如果false,那么当被解码的最后一帧不包含有效分隔符时,此流会导致流失败,而不是返回截断的帧。

所以你所要做的就是添加缺少的参数:

Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true)

【讨论】:

  • 太完美了!我已经开始将其标记为“假”并尝试捕获错误,但 allowTrunction = true 确实想要我想要的。谢谢!
猜你喜欢
  • 2011-08-07
  • 1970-01-01
  • 2012-03-03
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-07-11
  • 1970-01-01
相关资源
最近更新 更多