【问题标题】:spark data frame compare and show only values that are different火花数据框比较并仅显示不同的值
【发布时间】:2020-08-26 05:33:36
【问题描述】:

我有 2 个要比较的数据框,除了显示第一个数据集中存在的数据和第二个数据集中缺少的数据外,我正在使用它。它工作正常我只想显示不同的值而不是整行,这样对某人来说很容易识别有差异的字段。

下面是代码片段

 val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test6").getOrCreate();

  val schemaOrig = List( StructField("key",StringType,true)
    ,StructField("name",StringType,true)
    ,StructField("start_ts",TimestampType,true)
    ,StructField("txn_dt",StringType,true))

  val df =  spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("1","john",java.sql.Timestamp.valueOf("2018-10-16 00:00:00"),"2020-02-14")))
    ,StructType(schemaOrig))

  val df2 =  spark.createDataFrame(spark.sparkContext.parallelize(Seq(Row("1","andrew",java.sql.Timestamp.valueOf("2017-10-16 00:00:00"),"2020-02-14")))
    ,StructType(schemaOrig))

  df.except(df2).show(true)

+---+----+-------------------+----------+
|key|name|           start_ts|    txn_dt|
+---+----+-------------------+----------+
|  1|john| 2018-10-16 00:00:00 2020-02-14                 |
+---+----+-------------------+----------+

预期输出

+---+-------------+--------------------+
|key|diff columns |     diff values 
+---+----------------------------------+
 1   name,txn_dt      john,2018-10-16 00:00:00

【问题讨论】:

  • 你能添加预期的输出吗?
  • 更新了预期的输出,基本上我有一个巨大的数据集,其中包含 100 多列,因此在电子邮件中显示整行是很麻烦的。所以只想限制显示的数据
  • 如果您在邮件中发送结果,如果您有更多列,则在预期输出上方显示水平,而不是您可以以这种格式发送差异列及其值 ---------- ---------------------------------- |键 |专栏 |价值 | ---------------------------------------------- |1 |姓名 |约翰 | |1 | start_ts | 2018-10-16 00:00:00 | ---------------------------------------------

标签: dataframe apache-spark apache-spark-sql apache-spark-dataset


【解决方案1】:

使用 full outer join 并提取不匹配的列。

请检查以下代码。

scala> dfa.printSchema
root
 |-- key: string (nullable = true)
 |-- name: string (nullable = true)
 |-- start_ts: timestamp (nullable = true)
 |-- txn_dt: string (nullable = true)


scala> dfa.show(false)
+---+----+-------------------+----------+
|key|name|start_ts           |txn_dt    |
+---+----+-------------------+----------+
|1  |john|2018-10-16 00:00:00|2020-02-14|
+---+----+-------------------+----------+


scala> dfb.printSchema
root
 |-- key: string (nullable = true)
 |-- name: string (nullable = true)
 |-- start_ts: timestamp (nullable = true)
 |-- txn_dt: string (nullable = true)


scala> dfb.show(false)
+---+------+-------------------+----------+
|key|name  |start_ts           |txn_dt    |
+---+------+-------------------+----------+
|1  |andrew|2017-10-16 00:00:00|2020-02-14|
+---+------+-------------------+----------+


scala> val diff_cols = dfa.columns.filterNot(_ == "key").map(c => when(dfa(c) =!= dfb(c),c))
diff_cols: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (NOT (name = name)) THEN name END, CASE WHEN (NOT (start_ts = start_ts)) THEN start_ts END, CASE WHEN (NOT (txn_dt = txn_dt)) THEN txn_dt END)

scala> val diff_values = dfa.columns.filterNot(_ == "key").map(c => when(dfa(c) =!= dfb(c),dfa(c)))
diff_values: Array[org.apache.spark.sql.Column] = Array(CASE WHEN (NOT (name = name)) THEN name END, CASE WHEN (NOT (start_ts = start_ts)) THEN start_ts END, CASE WHEN (NOT (txn_dt = txn_dt)) THEN txn_dt END)

scala> dfa.join(dfb,dfa("key") === dfb("key"),"full").select(dfa("key"),concat_ws(",",diff_cols:_*).as("diff_columns"),concat_ws(",",diff_values:_*).as("diff_values")).show(false) // using full join & taking diff columns & values.
+---+-------------+------------------------+
|key|diff_columns |diff_values             |
+---+-------------+------------------------+
|1  |name,start_ts|john,2018-10-16 00:00:00|
+---+-------------+------------------------+


scala>

【讨论】:

  • 太好了,我试试看
猜你喜欢
  • 2011-05-02
  • 2022-11-11
  • 2015-09-13
  • 1970-01-01
  • 2022-01-24
  • 2021-11-01
  • 1970-01-01
  • 2020-05-23
  • 1970-01-01
相关资源
最近更新 更多