【问题标题】:Spark SQL - data comparisonSpark SQL - 数据比较
【发布时间】:2019-07-17 13:10:37
【问题描述】:

比较具有相同架构和主键列的两个 csv 文件(数百万行)并打印出差异的最佳方法是什么。例如,

CSV1

Id  name   zip   
1   name1  07112  
2   name2  07234  
3   name3  10290  

CSV2

Id  name    zip   
1   name1   07112  
2   name21  07234  
4   name4   10290  

比较修改后的文件 CSV2 和原始数据 CSV1,

输出应该是

Id name    zip   
2  name21  07234 Modified  
3  name3   10290 Deleted  
4  name4   10290 Added  

Spark SQL 新手,我正在考虑将数据导入 Hive 表,然后运行 ​​Spark SQL 来识别更改。

1) 是否有任何行修改方法可用于识别行是否已修改,而不是比较每列中的值? 2) 有没有更好的方法可以使用 Spark 或其他 HDFS 工具来实现?

感谢反馈

【问题讨论】:

  • 投反对票:你试过什么?
  • 如果它有效,请接受答案,但它确实......
  • @thebluephantom 是的.. 它有效!我还将使用哈希码(以识别修改的行)针对自定义 C#/python 代码运行一些基准测试。希望这种火花方法会表现得更好。感谢您的帮助!

标签: apache-spark hive apache-spark-sql hdfs bigdata


【解决方案1】:

存在许多方法;这是一个可以并行完成的事情:

import org.apache.spark.sql.functions._
import sqlContext.implicits._

val origDF = sc.parallelize(Seq(
  ("1", "a", "b"),
  ("2", "c", "d"),
  ("3", "e", "f")
)).toDF("k", "v1", "v2")

val newDF = sc.parallelize(Seq(
  ("1", "a", "b"),
  ("2", "c2", "d"),
  ("4", "g", "h")
)).toDF("k", "v1", "v2")

val df1 = origDF.except(newDF) // if k not exists in df2, then deleted
//df1.show(false)
val df2 = newDF.except(origDF) // if k not exists in df1, then added
//df2.show(false)

                           // if no occurrence in both dfs, then the same
                           // if k exists in both, then k in df2 = modified

df1.createOrReplaceTempView("df1")
df2.createOrReplaceTempView("df2")

val df3 = spark.sql("""SELECT df1.k, df1.v1, df1.v2, "deleted" as operation
                         FROM  df1 
                        WHERE NOT EXISTS (SELECT df2.k 
                                            FROM df2
                                            WHERE df2.k = df1.k)
                          UNION
                       SELECT df2.k, df2.v1, df2.v2, "added" as operation
                         FROM  df2 
                         WHERE NOT EXISTS (SELECT df1.k 
                                             FROM df1
                                            WHERE df1.k = df2.k)
                          UNION
                       SELECT df2.k, df2.v1, df2.v2, "modified" as operation
                         FROM  df2 
                        WHERE EXISTS (SELECT df1.k 
                                        FROM df1
                                        WHERE df1.k = df2.k)

                   """)

df3.show(false)

返回:

+---+---+---+---------+
|k  |v1 |v2 |operation|
+---+---+---+---------+
|4  |g  |h  |added    |
|2  |c2 |d  |modified |
|3  |e  |f  |deleted  |
+---+---+---+---------+

没那么难,没有标准实用程序。

【讨论】:

    猜你喜欢
    • 2020-06-07
    • 1970-01-01
    • 1970-01-01
    • 2018-06-01
    • 2018-01-15
    • 2023-03-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多