【发布时间】:2019-01-08 01:34:41
【问题描述】:
我这里有两个数据框: df1 在这里
+----------+------+---------+--------+------+
| OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341| 136| 9| 1| I|!||
|4295877342| 111| 4| 2| I|!||
|4295877343| 138| 2| 1| I|!||
|4295877344| 141| 4| 1| I|!||
|4295877345| 143| 2| 1| I|!||
|4295877346| 145| 14| 1| d|!||
+----------+------+---------+--------+------+
df2 在这里:
+----------+------+---------+--------+------+
| OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341| 136| 4| 1| I|!||
|4295877342| 136| 4| 1| I|!||
|4295877343| 900| 2| 1| K|!||
|4295877344| 141| 4| 1| D|!||
|4295877345| 111| 2| 1| I|!||
|4295877346| 145| 14| 1| I|!||
|4295877347| 145| 14| 1| I|!||
+----------+------+---------+--------+------+
如果 df1 不在 df2 中,我只需要所有列值。 像下面...
4295877341|^|segmentId=9,segmentId=4|^|1|^|I|!|
4295877342|^|ItemId=111,ItemId=136|^|Sequence=2,Sequence=1|^|I|!|
每行以此类推...
这里 OrgId 是我的两个 dataframe 的主键。
所以基本上对于每个 OrgId 我都需要收集两个版本,只是列更改了值。
这是我到目前为止所尝试的。
val columns = df1.schema.fields.map(_.name)
val selectiveDifferences = columns.map(col =>
df1.select(col).except(df2.select(col)))
selectiveDifferences.map(diff => {if(diff.count > 0) diff.show})
但它一次只给我一个例外输出。
问候, 苏达山
【问题讨论】:
-
它似乎也没有产生预期的输出——如果你在两个数据框中为两个不同的
OrgIds 的 Y 列有值 X 怎么办——这些不会出现(因为 @ 987654326@ 将删除 X),但它们出现在不同的OrgIds 中。 -
@TzachZohar 抱歉,我已经编辑了我的问题……我想我必须想办法……
-
预期结果的schema是什么? DataFrame 中的行必须具有相同的结构,不能有 N 列的行和 N+1 列的行。您是否仍希望有与输入类似的单独列,
nulls 没有差异?还是您想将所有列“合并”为一个数组/映射列?请定义所需输出的确切结构。 -
@TzachZohar 如果列有变化,那么它应该出现,如果没有变化,那么它应该被隐藏......将所有列合并到一个 arrry/map 中也可以......跨度>
-
请准确一点 - 预期结果的 schema 是什么?什么是列和列类型?再一次,一列不能“出现”在一条记录中,也不能出现在另一条记录中 - 整个 DataFrame 必须具有相同的架构。
标签: scala apache-spark dataframe apache-spark-sql