【问题标题】:Comparing the values (list) of Pyspark dataframes比较 Pyspark 数据帧的值(列表)
【发布时间】:2020-01-30 14:26:18
【问题描述】:

我想比较 list_id 列上的两个 df1 df2 数据帧:

df1 = 
+---------+
|  list_id|
+---------+
|[1, 2, 3]|
|[4, 5, 6]|
|[7, 8, 9]|
+---------+
df2 =
+------------+
|     list_id|
+------------+
| [10, 3, 11]|
|[12, 13, 14]|
| [15, 6, 16]|
+------------+

想要的结果是:

df2 =
+-------------------+
|            list_id|
+-------------------+
| [1, 2, 3, 10, 11] |
| [4, 5, 6, 15, 16] |
| [7, 8, 9]         |
| [12, 13, 14]      |
+-------------------+

我的目标是连接它们的交集不为空的列表,并使其他列表与 pyspark 保持原样。

注意:我的数据框非常大,不可能使用 Spark Sql 的连接。

【问题讨论】:

  • 如果两个数据帧之间有多个匹配项怎么办?
  • 到目前为止你尝试了什么?
  • 我尝试使用列表列之间的交集函数进行完全连接,这会产生内存错误
  • 7,8,9 ?.. 这有效吗?
  • 什么版本的火花?

标签: python pyspark pyspark-dataframes


【解决方案1】:

我想出了一个无需任何连接操作即可工作的代码。 它有点混乱,考虑到我多次爆炸数组,我不知道它会如何表现内存。

import pyspark.sql.functions as F
from pyspark.sql.window import Window

df1 = (sc.parallelize([(1, 2, 3), (4, 5, 6), (7, 8, 9)])
         .toDF(('c1', 'c2', 'c3'))
         .select(F.array(F.col('c1'), F.col('c2'), F.col('c3')).alias('id_list'))
        )

df2 = (sc.parallelize([(10, 3, 11), (12, 13, 14), (15, 6, 16)])
         .toDF(('c1', 'c2', 'c3'))
         .select(F.array(F.col('c1'), F.col('c2'), F.col('c3')).alias('id_list'))
         )

out = (df1.union(df2)
         .withColumn('key1', F.explode('id_list'))
         .withColumn('key2', F.explode('id_list'))
         .groupBy('key1')
         .agg(F.sort_array(F.collect_set(F.col('key2'))).alias('id_list'))
         .withColumn('key1', F.explode('id_list'))
         .withColumn('max_length', F.max(F.size('id_list')).over(Window().partitionBy('key1')))
         .where(F.col('max_length')==F.size('id_list'))
         .select('id_list')
         .distinct()
    )

【讨论】:

  • 我测试了您的解决方案,但出现错误:ExecutorLostFailure(执行程序 11 因其中一项正在运行的任务而退出)原因:容器标记为失败:主机上的 container_xxxxxxxxxxxxxxx:xxx-node-xxx。退出状态:143。诊断:[2019-10-01 15:28:55.322]容器应要求被杀死。退出代码为 143 [2019-10-01 15:28:55.322]容器以非零退出代码 143 退出。[2019-10-01 15:28:55.323]被外部信号杀死
  • 这是内存错误。我没有时间去想一个更优化的解决方案。尝试首先调整您的内存分配/如果可以的话。这会有所帮助
  • 感谢您的回答,我使用了集群的最大内存但总是出现同样的错误。你认为你必须进一步划分数据帧吗?
  • 如果运行时间不是主要问题,我会说是的。但是您的问题基本上是构建一个图表,因此您可以尝试以不同的方式处理它。我对这些主题一点也不熟悉,尤其是在研究 spark 时。
  • 您也可以将这些东西细分为更小的 DF 并逐步写入结果。这个例子很糟糕,因为我看不出是否存在位置依赖性。
猜你喜欢
  • 1970-01-01
  • 2018-06-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-05-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多