【问题标题】:Comparing two large dataframes w/ pySpark使用 pySpark 比较两个大型数据帧
【发布时间】:2020-06-05 00:52:05
【问题描述】:

这个post 提到了对称差异和利用代码df1.except(df2).union(df2.except(df1)) 和/或df1.unionAll(df2).except(df1.intersect(df2)),但是我在使用except 时遇到了语法错误。

我正在尝试比较两个最多可以包含 50 或 50+ 列的数据框。我有下面的工作代码,但需要避免硬编码列。

示例代码和示例

# Create the two dataframes
df1 = sqlContext.createDataFrame([(11,'Sam',1000,'ind','IT','2/11/2019'),(22,'Tom',2000,'usa','HR','2/11/2019'),
                                 (33,'Kom',3500,'uk','IT','2/11/2019'),(44,'Nom',4000,'can','HR','2/11/2019'),
                                 (55,'Vom',5000,'mex','IT','2/11/2019'),(66,'XYZ',5000,'mex','IT','2/11/2019')],
                                 ['No','Name','Sal','Address','Dept','Join_Date']) 
df2 = sqlContext.createDataFrame([(11,'Sam',1000,'ind','IT','2/11/2019'),(22,'Tom',2000,'usa','HR','2/11/2019'),
                                  (33,'Kom',3000,'uk','IT','2/11/2019'),(44,'Nom',4000,'can','HR','2/11/2019'),
                                  (55,'Xom',5000,'mex','IT','2/11/2019'),(77,'XYZ',5000,'mex','IT','2/11/2019')],
                                  ['No','Name','Sal','Address','Dept','Join_Date']) 
df1 = df1.withColumn('FLAG',lit('DF1'))
df2 = df2.withColumn('FLAG',lit('DF2'))

# Concatenate the two DataFrames, to create one big dataframe.
df = df1.union(df2)


#Use window function to check if the count of same rows is more than 1 and if it indeed is, then mark column FLAG as SAME, else keep it the way it is. Finally, drop the duplicates.

my_window = Window.partitionBy('No','Name','Sal','Address','Dept','Join_Date').rowsBetween(-sys.maxsize, sys.maxsize)
df = df.withColumn('FLAG', when((count('*').over(my_window) > 1),'SAME').otherwise(col('FLAG'))).dropDuplicates()
df.show()

【问题讨论】:

  • 你必须自己编码还是可以使用工具来完成它?
  • @Saša Zejnilović,我可以使用包和帮助库。我还没有在 pySpark 中找到任何东西
  • github.com/AbsaOSS/hermes 有一个 DatasetComparison 模块。一个独立的火花工作。在 scala 中,它也可以用作库。

标签: python dataframe apache-spark pyspark


【解决方案1】:

您可以从df 获取所有列名,并将该列表用作Window 函数的参数:

cols = df.columns
cols.remove('FLAG')
my_window = Window.partitionBy(cols).rowsBetween(-sys.maxsize, sys.maxsize)

其余代码保持不变。

【讨论】:

  • 为什么要使用cols.remove('FLAG')
  • FLAG 是包含行是最初来自 df1 还是 df2 的信息的列。您不希望此列成为窗口定义的一部分。
  • 我使用相同的代码,但得到不同的输出,stackoverflow.com/questions/62272046/…@werner
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2021-05-16
  • 1970-01-01
  • 2019-07-21
  • 2020-06-02
  • 2020-09-30
  • 1970-01-01
  • 2018-06-08
相关资源
最近更新 更多