【问题标题】:Compare each columns of two data frames and output only diff columns比较两个数据帧的每一列,只输出差异列
【发布时间】:2019-01-08 01:34:41
【问题描述】:

我这里有两个数据框: df1 在这里

+----------+------+---------+--------+------+
|     OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|   136|        9|       1|  I|!||
|4295877342|   111|        4|       2|  I|!||
|4295877343|   138|        2|       1|  I|!||
|4295877344|   141|        4|       1|  I|!||
|4295877345|   143|        2|       1|  I|!||
|4295877346|   145|       14|       1|  d|!||
+----------+------+---------+--------+------+

df2 在这里:

+----------+------+---------+--------+------+
|     OrgId|ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|   136|        4|       1|  I|!||
|4295877342|   136|        4|       1|  I|!||
|4295877343|   900|        2|       1|  K|!||
|4295877344|   141|        4|       1|  D|!||
|4295877345|   111|        2|       1|  I|!||
|4295877346|   145|       14|       1|  I|!||
|4295877347|   145|       14|       1|  I|!||
+----------+------+---------+--------+------+

如果 df1 不在 df2 中,我只需要所有列值。 像下面...

4295877341|^|segmentId=9,segmentId=4|^|1|^|I|!|
4295877342|^|ItemId=111,ItemId=136|^|Sequence=2,Sequence=1|^|I|!|

每行以此类推...

这里 OrgId 是我的两个 dataframe 的主键。

所以基本上对于每个 OrgId 我都需要收集两个版本,只是列更改了值。

这是我到目前为止所尝试的。

val columns = df1.schema.fields.map(_.name)
val selectiveDifferences = columns.map(col => 
df1.select(col).except(df2.select(col)))
selectiveDifferences.map(diff => {if(diff.count > 0) diff.show})

但它一次只给我一个例外输出。

问候, 苏达山

【问题讨论】:

  • 它似乎也没有产生预期的输出——如果你在两个数据框中为两个不同的OrgIds 的 Y 列有值 X 怎么办——这些不会出现(因为 @ 987654326@ 将删除 X),但它们出现在不同的 OrgIds 中。
  • @TzachZohar 抱歉,我已经编辑了我的问题……我想我必须想办法……
  • 预期结果的schema是什么? DataFrame 中的行必须具有相同的结构,不能有 N 列的行和 N+1 列的行。您是否仍希望有与输入类似的单独列,nulls 没有差异?还是您想将所有列“合并”为一个数组/映射列?请定义所需输出的确切结构。
  • @TzachZohar 如果列有变化,那么它应该出现,如果没有变化,那么它应该被隐藏......将所有列合并到一个 arrry/map 中也可以......跨度>
  • 请准确一点 - 预期结果的 schema 是什么?什么是列和列类型?再一次,一列不能“出现”在一条记录中,也不能出现在另一条记录中 - 整个 DataFrame 必须具有相同的架构。

标签: scala apache-spark dataframe apache-spark-sql


【解决方案1】:

您没有为输出定义所需的结构,因此我假设将列分开,每列包含不同值的 arraynull 如果它们匹配就足够了:

// list of columns to compare
val cols = df1.columns.filter(_ != "OrgId").toList

// function to create an expression that results in null for similar values,
// and with a two-item array with the differing values otherwise
def mapDiffs(name: String) = when($"l.$name" === $"r.$name", null)
  .otherwise(array($"l.$name", $"r.$name"))
  .as(name)

// joining the two DFs on OrgId
val result = df1.as("l")
  .join(df2.as("r"), "OrgId")
  .select($"OrgId" :: cols.map(mapDiffs): _*)

result.show()
// +----------+----------+---------+--------+------------+
// |     OrgId|    ItemId|segmentId|Sequence|      Action|
// +----------+----------+---------+--------+------------+
// |4295877341|      null|   [9, 4]|    null|        null|
// |4295877342|[111, 136]|     null|  [2, 1]|        null|
// |4295877343|[138, 900]|     null|    null|[I|!|, K|!|]|
// |4295877344|      null|     null|    null|[I|!|, D|!|]|
// |4295877345|[143, 111]|     null|    null|        null|
// |4295877346|      null|     null|    null|[d|!|, I|!|]|
// +----------+----------+---------+--------+------------+

【讨论】:

  • 这是我需要的,但我们不能用一些空格替换 null 吗?
  • 并非如此 - 因为列必须具有单一类型,并且这些列的类型为 Array[Int]Array[String] - “空格”不是数组。更重要的是 - 你应该确保你知道空白是否(以及为什么)会更好 - 我根本不知道它会如何使用。
  • 我们可以先使用except函数,然后将第一个数据框与生成的数据框连接起来吗? val cols = DF1.columns.filter(_ != "emp_id").toList val DF3 = DF1.except(DF2) def mapDiffs(name: String) = when($"l.$name" === $"r .$name", null).otherwise(array($"l.$name", $"r.$name")).as(name) val result = DF2.as("l").join(DF3. as("r"), "emp_id").select($"emp_id" :: cols.map(mapDiffs): _*) 这样我们就看不到其中没有变化的行,除此之外我们的加入会更便宜
猜你喜欢
  • 2013-04-25
  • 1970-01-01
  • 1970-01-01
  • 2019-06-29
  • 2022-07-08
  • 1970-01-01
  • 2023-03-19
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多