【问题标题】:akka stream alpakka csv: skip exception and parse next rowsakka stream alpakka csv:跳过异常并解析下一行
【发布时间】:2018-11-29 16:16:28
【问题描述】:

我正在使用 Alpakka 解析 csv 文件。版本“com.lightbend.akka”%%“akka-stream-alpakka-csv”%0.20 我有带有未封闭报价的 csv 文件。

email
test@emample.com
"test@emample.com
test@emample.com
test@emample.com

我想跳过错误的行并继续下一步,但我的信息流正在下降。

我正在使用 supervisorStrategy Supervision.Resume,但它不起作用。

找到未关闭的引用时流失败。

有什么办法可以解决吗?

我的代码:

implicit val system = ActorSystem("QuickStart")
implicit val materializer = ActorMaterializer()

def hdfsSource(csv: String): Source[ByteString, Future[IOResult]] =
  Source
    .single(csv)
    .map(ByteString.apply)
    .mapMaterializedValue(_ => Future.successful(IOResult(1, Success(Done))))

val csv = """email,country,name
            |"test,test,test
            |test,test,test
            |test,test,test
            |""".stripMargin

val source = hdfsSource(csv)

val decider: Supervision.Decider = {
  case _ ⇒ Supervision.Resume
}

val result = source
  .via(CsvParsing.lineScanner())
  .via(CsvToMap.toMapAsStrings())
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .runForeach(println)

【问题讨论】:

    标签: scala akka akka-stream alpakka


    【解决方案1】:

    目前CsvParsing.lineScanner() 不支持监管策略。您可以选择另一个符号作为行扫描器CsvParsing.lineScanner(quoteChar = '\'') 的引号字符。然后你会得到未闭合的双引号作为解析结果的一部分:

    Map(email -> "test, country -> test, name -> test) Map(email -> test, country -> test, name -> test) Map(email -> test, country -> test, name -> test)

    【讨论】:

      猜你喜欢
      • 2020-05-19
      • 1970-01-01
      • 2014-07-15
      • 1970-01-01
      • 2020-11-05
      • 1970-01-01
      • 2020-07-20
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多