【问题标题】:Reading a multiline CSV file in Spark在 Spark 中读取多行 CSV 文件
【发布时间】:2021-03-30 06:31:29
【问题描述】:

我正在尝试在 spark 中读取多行 csv 文件。我的模式是:ID、名称和标记。 我的输入和实际输出如下所示。我没有得到预期的输出。有人可以帮助我在我的代码中缺少什么。

代码:

val myMarkDF =   spark
                .read
                .format("csv")
                .option("path","mypath\\marks.csv")
                .option("inferSchema","true")
                .option("multiLine","true")
                .option("delimiter",",")
                .load

输入:

1,A,
97,,
1,A,98
1,A,
99,,
2,B,100
2,B,95

实际输出:

+---+----+----+
|_c0| _c1| _c2|
+---+----+----+
|  1|   A|null|
| 97|null|null|
|  1|   A|  98|
|  1|   A|null|
| 99|null|null|
|  2|   B| 100|
|  2|   B|  95|
+---+----+----+

预期输出:

+---+----+----+
|_c0| _c1| _c2|
+---+----+----+
|  1|   A|  97|
|  1|   A|  98|
|  1|   A|  99|
|  2|   B| 100|
|  2|   B|  95|
+---+----+----+

谢谢!

【问题讨论】:

  • 也许你需要额外的选项here
  • 感谢您调查我的问题。但不幸的是,在添加了 parserlib 选项后,它仍然没有给出预期的 o/p。
  • 这是一个猜测。我有一段时间没有使用火花了。如果有必要在阅读器之外将多行 csv 转换为单行,我不会为此使用 spark,因为相邻的行可能不在同一个工作人员上。基本上,您在数据帧/rdd 中失去了邻接关系。
  • 绝对正确,我只是在试验。
  • 3 个问题:同一条记录可以显示在 3 行上,一行可以包含多于 1 条记录吗?您的样本记录总是在第 3 列被破坏,这是否会发生在第 2 列,例如,第一行是 1,, 而第二行是 A,97,?请添加所有可能的案例。

标签: scala csv apache-spark


【解决方案1】:

编辑:一种更好的解决方案,可以处理更多类型的损坏记录(在第 2 列或第 3 列损坏)。重要的部分是计算非空条目的累积和,它将应该在同一记录中的行组合在一起。

val df = spark.read.csv("file.csv")
df.show
+---+----+----+
|_c0| _c1| _c2|
+---+----+----+
|  1|   A|null|
| 97|null|null|
|  1|   A|  98|
|  1|null|null|   <-- note that I intentionally changed these two rows
|  A|  99|null|   <-- to demonstrate how to handle two types of broken records
|  2|   B| 100|
|  2|   B|  95|
+---+----+----+
val df2 = df.withColumn(
    "id", monotonically_increasing_id()
).withColumn(
    "notnulls",
    $"_c0".isNotNull.cast("int") + $"_c1".isNotNull.cast("int") + $"_c2".isNotNull.cast("int")
).withColumn(
    "notnulls",
    ceil(sum($"notnulls").over(Window.orderBy("id")) / 3)
).groupBy("notnulls").agg(
    filter(
        flatten(collect_list(array("_c0","_c1","_c2"))),
        x => x.isNotNull
    ).alias("array")
).select(
    $"array"(0).alias("c0"),
    $"array"(1).alias("c1"),
    $"array"(2).alias("c2")
)

df2.show
+---+---+---+
| c0| c1| c2|
+---+---+---+
|  1|  A| 97|
|  1|  A| 98|
|  1|  A| 99|
|  2|  B|100|
|  2|  B| 95|
+---+---+---+

效果不佳的旧答案:

不是解析 csv 的最佳方式,但至少是您的用例的 MVP:

val df = sc.wholeTextFiles("marks.csv").map(
    row => row._2.replace(",,\n", "\n").replace(",\n", ",").split("\n")
).toDF(
    "value"
).select(
    explode($"value")
).select(
    split($"col", ",").as("col")
).select(
    $"col"(0), $"col"(1), $"col"(2)
)

df.show
+------+------+------+
|col[0]|col[1]|col[2]|
+------+------+------+
|     1|     A|    97|
|     1|     A|    98|
|     1|     A|    99|
|     2|     B|   100|
|     2|     B|    95|
+------+------+------+

【讨论】:

  • 感谢 mck!,文档说如果记录在 csv 文件中蔓延多行,只需将 multiLine 选项启用为 true,就可以读取记录而不会丢失数据。就我而言,它不起作用。你能帮忙吗?
  • @user3103957 跨越多行的记录意味着类似于a,b,c\nd,而不是类似于a,b,\nc,, 的文件格式。您的文件格式中有 2 个虚假逗号。
  • @user3103957 我添加了另一种解析数据框的方法 - 看看编辑后的答案是否有帮助
猜你喜欢
  • 2017-03-09
  • 2016-01-03
  • 2020-11-12
  • 1970-01-01
  • 2014-12-10
  • 2021-10-11
  • 2019-02-08
  • 1970-01-01
  • 2023-03-26
相关资源
最近更新 更多