【问题标题】:Dataframe: Fetching count of records present in table A but not in table B. table B has 29 million records. (pyspark)数据框:获取表 A 中存在但表 B 中不存在的记录数。表 B 有 2900 万条记录。 (pyspark)
【发布时间】:2020-06-14 21:38:30
【问题描述】:
  • 表 A -- 是一个只有 1 列和 47000 条记录的 DataFrame 不同的应用了 14000 条记录。
  • 表 B -- 是一个 DataFrame,只有 1 列和 2900 万条记录,所有不同的值。

我需要获取表 A 中但不在表 B 中的记录数。但不知何故,在 PySpark shell(本地模式)中运行以下查询时,我遇到了以下错误。

Table_A         Table_B
123450          
123451
123452          123452
123453          123453
123454          123454
                123455
                123456
                123457

预期输出:- 2

DB_accnum=spark.sql("select org_acctnum from Table_A where 'some filter conditions'")

ACC_repository=spark.sql("select account_num from Table_B")

DB_accnum_d=DB_accnum.select('org_acctnum').distinct()
DB_accnum_d.persist()
broadcast(DB_accnum_d)
DB_accnum_d.count()

R_join= ACC_repository.join(DB_accnum_d,ACC_repository.account_num == 
DB_accnum.org_acctnum,how='rightouter')
R_join.count()

在此之后我得到以下错误:-

R_join.count()
20/06/14 22:03:15 WARN TaskMemoryManager: Failed to allocate a page (1073741824 bytes), try again.
20/06/14 22:03:16 WARN TaskMemoryManager: Failed to allocate a page (1073741824 bytes), try again.
  • 内连接工作正常。(尝试 count(),在内连接的 df 上显示 (n=5))
  • 在获得加入计数后,我打算过滤不存在​​于 表 B,然后获取 new_DF 的计数,但中间出现错误。

谁能告诉我这是正确的方法还是我做错了什么?

【问题讨论】:

  • 您是否已经在不广播表 DB_accnum_d 的情况下运行?
  • 是的,我在没有广播的情况下运行,遇到了同样的错误。 @Kafels

标签: dataframe apache-spark pyspark apache-spark-sql


【解决方案1】:

您可以通过运行减号查询或在数据框 API 中使用 exceptAll 来做到这一点

只需确保列别名相同,并且两个数据框都包含相同的架构。


DB_accnum = spark.sql("select org_acctnum from Table_A where 'some filter conditions'")

ACC_repository = spark.sql("select account_num as Id from Table_B")

DB_accnum_d = DB_accnum.select(col('org_acctnum').alias('Id').distinct()

difference = DB_accnum_d.exceptAll(ACC_repository)

#difference will contain the account number present in DB_accnum_d and not present in ACC_repository
difference.count()
difference.show()

【讨论】:

  • 你的 spark 版本是什么
  • difference=DB_accnum_d.exceptAll(ACC_repository) 试过了。 Got error "AttributeError: 'DataFrame' object has no attribute 'exceptAll' " 然后尝试 difference=DB_accnum_d.subtract(ACC_repository) 它被编译但是当我运行 count() difference.count() got the same error " WARN TaskMemoryManager: Failed to allocate a page (1073741824 bytes), try again." @shubham 火花版本是 -- 版本 2.3.0.2.6.5.0-292
  • Spark 版本 -- 2.3.0.2.6.5.0-292
  • exceptAll 是 2.4 独有的。我将删除我的答案
  • 您的集群大小是多少?此外,您只是收到一条警告,让它继续运行,看看您是否得到结果
猜你喜欢
  • 2013-05-26
  • 1970-01-01
  • 2022-01-09
  • 1970-01-01
  • 2020-02-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多