【发布时间】:2018-03-31 15:34:18
【问题描述】:
我正在努力将已知的工作 SQL 查询转换为在 pyspark 中工作,给定两个数据帧,使用以下方法:.join、.where、filter 等。
以下是有效的 SQL 查询示例(仅选择 r.id,我通常会选择更多列):
# "invalid" records, where there is a matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;
# "valid" records, where there is no matching `record_id` for rv_df
SELECT DISTINCT(r.id) FROM core_record AS r LEFT OUTER JOIN core_recordvalidation rv ON r.id = rv.record_id WHERE r.job_id = 41 AND rv.record_id is not null;
我已接近 80/20,但无法理解最后几个步骤和/或如何最有效地执行此操作。
我有一个 Dataframe r_df 列 id 我想加入 Dataframe rv_df 列 record_id。作为输出,我只想要 distinct r.id,并且只想要来自 r_df 的列,没有来自 rv_df 的列。最后,我想要两个不同的调用,其中 匹配(对我来说什么是“无效”记录),以及 不是 匹配(我考虑“有效”记录)。
我有接近的 pyspark 查询,但不太清楚如何确保 r_df.id 是不同的,并且仅从 r_df 中选择列,从 rv_df 中不选择列。
任何帮助将不胜感激!
【问题讨论】:
-
您问题中的两个查询在我看来都是一样的。将其转换为 DataFrame 函数将是:
invalid_df = r_df.alias('r').join(rv_df.withColumn('record_id', f.col('id')).alias('rv'), on='id', how='left_outer').where('(r.job_id = 41) AND (rv.record_id is not null)').select('r.id').distinct()。基于docs forjoin:列必须存在于两边,这就是我在rv_df上创建id列的原因。
标签: pyspark pyspark-sql