【问题标题】:Filter spark dataframe based on another dataframe columns by converting it into list通过将其转换为列表来根据另一个数据框列过滤火花数据框
【发布时间】:2020-09-04 16:22:22
【问题描述】:
df = spark.createDataFrame([("1gh","25g","36h"),("2gf","3ku","4we"),("12w","53v","c74"),("1a2","3d4","4c5"),("232","3df","4rt")], ["a","b","c"])


filter_df = spark.createDataFrame([("2gf","3ku"),("12w","53v"), ("12w","53v")], ["a","b"])

我取了“a” OF“filter_df”并创建了一个 rdd,然后从以下代码中列出

unique_list = filter_df.select("a").rdd.flatMap(lambda x: x).distinct().collect()

这给了我:

unique_list = [u'2gf', u'12w']

尝试使用收集操作将 rdd 转换为列表。但这给了我如下所示的分配错误

final_df = df.filter(F.col("a").isin(unique_list))

118.255: [GC (Allocation Failure) [PSYoungGen: 1380832K->538097K(1772544K)] 2085158K->1573272K(3994112K), 0.0622847 secs] [Times: user=2.31 sys=1.76, real=0.06 secs]
122.540: [GC (Allocation Failure) [PSYoungGen: 1772529K->542497K(2028544K)] 2807704K->1581484K(4250112K), 0.3217980 secs] [Times: user=11.16 sys=13.15, real=0.33 secs]
127.071: [GC (Allocation Failure) [PSYoungGen: 1776929K->542721K(2411008K)] 2815916K->1582011K(4632576K), 0.8024852 secs] [Times: user=58.43 sys=4.85, real=0.80 secs]
133.284: [GC (Allocation Failure) [PSYoungGen: 2106881K->400752K(2446848K)] 3146171K->1583953K(4668416K), 0.4198589 secs] [Times: user=18.31 sys=12.58, real=0.42 secs]
139.050: [GC (Allocation Failure) [PSYoungGen: 1964912K->10304K(2993152K)] 3148113K->1584408K(5214720K), 0.0712454 secs] [Times: user=2.92 sys=0.88, real=0.08 secs]
146.638: [GC (Allocation Failure) [PSYoungGen: 2188864K->12768K(3036160K)] 3762968K->1588544K(5257728K), 0.1212116 secs] [Times: user=3.05 sys=3.74, real=0.12 secs]
154.153: [GC (Allocation Failure) [PSYoungGen: 2191328K->12128K(3691008K)] 3767104K->1590112K(5912576K), 0.1179030 secs] [Times: user=6.94 sys=0.11, real=0.12 secs

必需的输出:

final_df

+---+---+---+
|  a|  b|  c|
+---+---+---+
|2gf|3ku|4we|
|12w|53v|c74|
+---+---+---+

使用另一个 rdd 或列表或不同的数据帧过滤掉 spark 数据帧的有效方法是什么。上述数据为样本。我有更大的实时数据集

【问题讨论】:

  • 如果您不想看到任何重复,请在 df , filter_df 之间加入 left_semi。如果您不介意看到重复,请使用inner join。如果您仍然想使用半内连接并过滤重复项,请在执行连接之前执行.distinct on filter_df

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

使用left_semi加入。

df.join(filter_df, ['a','b'],'left_semi')

【讨论】:

    【解决方案2】:

    你可以使用内连接:

    df.join(filter_df).where(df.a == filter_df.a & df.b == filter_df.b)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-11-06
      • 1970-01-01
      • 2018-11-12
      • 1970-01-01
      • 2019-04-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多