【问题标题】:pyspark do calculation for each row against other rows and get maxpyspark 对每一行与其他行进行计算并获得最大值
【发布时间】:2020-07-18 03:43:00
【问题描述】:

我想针对表格中的每一行对每一行进行一些计算,然后将最大值保留在新列中。

例如,我想为以下数据框中的每一行计算(x / x * y)

id       score        
A        1.2        
B        4.5       
C        0.8      
D        1.1
E        6.7

A 将对所有其他行 B、C、D、E 进行计算

这就像做一个嵌套的for循环:

for(int i = 0; i < table.length; i++)
  for(int j = 0; j < table.length; j++)
    if(i == j) continue
    result = score-from-A / (score-from-A * score-from-B)
    if(result > max_score_A_current_has)
      max_score_A_current_has = result

这是预期的输出。公式为x / (x * y)

id       score      max  
A        1.2        1.25 because max in 0.2 (A-B), 1.25 (A-C), 0.9 (A-D)
B        4.5        1.25                0.83 (B-A), 1.25 (B-C), 0.91 (B-D)
C        0.8        0.9                 0.83 (C-A), 0.2 (C-B), 0.9 (C-D)
D        1.1        1.25                0.83 (D-A), 0.2 (D-B), 1.25 (D-C)

我不知道如何在 pyspark 中做到这一点。

【问题讨论】:

  • 添加预期输出并更详细地解释您的问题
  • 更新了问题

标签: python apache-spark pyspark


【解决方案1】:

由于您想将每一行与其他每一行进行比较(据我了解),唯一的选择是笛卡尔积。您可以使用以下代码:

# creating the input data
df = spark.createDataFrame(
          [('A', 1.2), ('B', 4.5), ('C', .8), ('D', 1.1), ('E', 6.7)],
          ['id', 'score']
)

# renaming columns for clarity
df2 = df.select(df['id'].alias('id2'), df['score'].alias('score2'))

# And using the crossJoin function (Cartesian product)
from pyspark.sql import functions as F
df\
    .crossJoin(df2)\
    .where(F.col('id') != F.col('id2'))\
    .withColumn("result", F.col('score') / (F.col('score') * F.col('score2')))\
    .groupBy("id", "score")\
    .agg(F.max(F.col('result')).alias("max"))\
    .orderBy("id").show()

产量:

+---+-----+------------------+                                                  
| id|score|               max|
+---+-----+------------------+
|  A|  1.2|              1.25|
|  B|  4.5|              1.25|
|  C|  0.8|0.9090909090909091|
|  D|  1.1|              1.25|
|  E|  6.7|              1.25|
+---+-----+------------------+

编辑 如果你想知道哪个 id 产生了最大的结果,你可以像这样使用struct 函数:

df\
    .crossJoin(df2)\
    .where(F.col('id') != F.col('id2'))\
    .withColumn("result", F.col('score') / (F.col('score') * F.col('score2')))\
    .withColumn("s", F.struct(F.col('result'), F.col('id2')))\
    .groupBy("id", "score")\
    .agg(F.max(F.col('s')).alias('s'))\
    .select('id', 'score', F.col('s.result').alias('max'),
                 F.col('s.id2').alias('id2'))\
    .orderBy('id').show()

【讨论】:

  • 有没有办法显示每行产生最高分数的 id 是什么?
  • 是的,您可以使用包含分数和 id 的窗口或结构来执行此操作,以便将它们聚合在一起。
  • 类似 T.StructType([T.StructField('id', T.StringType(), True), T.StructField('score', T.DoubleType(), True)] 的东西?你也可以把它放在你的答案中吗?谢谢!
猜你喜欢
  • 2021-10-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-12-20
  • 1970-01-01
  • 2018-07-20
  • 1970-01-01
  • 2021-11-25
相关资源
最近更新 更多