【发布时间】:2020-01-20 11:44:46
【问题描述】:
我正在尝试从 String 的 Seq 动态地将列添加到 DataFrame。
这是一个例子:源数据框是这样的:
+-----+---+----+---+---+
|id | A | B | C | D |
+-----+---+----+---+---+
|1 |toto|tata|titi| |
|2 |bla |blo | | |
|3 |b | c | a | d |
+-----+---+----+---+---+
我还有一个字符串序列,其中包含我要添加的列的名称。如果源 DataFrame 中已经存在一个列,它必须做一些如下的区别:
序列看起来像:
val columns = Seq("A", "B", "F", "G", "H")
期望是:
+-----+---+----+---+---+---+---+---+
|id | A | B | C | D | F | G | H |
+-----+---+----+---+---+---+---+---+
|1 |toto|tata|titi|tutu|null|null|null
|2 |bla |blo | | |null|null|null|
|3 |b | c | a | d |null|null|null|
+-----+---+----+---+---+---+---+---+
到目前为止我所做的是这样的:
val difference = columns diff sourceDF.columns
val finalDF = difference.foldLeft(sourceDF)((df, field) => if (!sourceDF.columns.contains(field)) df.withColumn(field, lit(null))) else df)
.select(columns.head, columns.tail:_*)
但我不知道如何以更简单、更容易阅读的方式有效地使用 Spark...
提前致谢
【问题讨论】:
-
如何使用
difference.foreach(x->df=df.withColumn(x,lit(null)))
标签: scala apache-spark apache-spark-sql