【问题标题】:Pyspark: 'For' loops to add rows to a dataframePyspark:'For'循环将行添加到数据框
【发布时间】:2018-04-03 15:03:30
【问题描述】:

我正在尝试使用 for 循环向数据框添加新行。所以输入是:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
Xyz     25    123      234        345
Abc     40    456      567        678

我想要的输出是这样的:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
 Xyz    25    123      234       345
 Xyz    26    789      123       234
 Abc    40    456      567       678
 Abc    41    890      456       567

所以,我的代码是这样的:

df = df.withColumn("ColNum", (df.ColNum + 1).cast(IntegerType())) \
       .withColumn("ColB_lag2", df.ColB_lag1)
       .withColumn("ColB_lag1", df.ColB)
       .withColumn("ColB", someFunc())

当我必须添加一行时,代码可以正常工作,但当我必须在循环中添加多行时会中断。所以我使用了一个 For 循环来完成它。我在循环开始时过滤最新的行,然后运行上面的逻辑来计算列的值。然后将新行附加到数据集,该数据集再次在循环顶部使用。输出最终看起来像这样:

ColA  ColNum  ColB  ColB_lag1  ColB_lag2
 Xyz    25    123      234       345
 Xyz    25    789      123
 Xyz    26    789      123
 Abc    40    456      567       678
 Abc    40    890      456
 Abc    41    890      456

问题是:PySpark 中的“For”循环是否由于并行化而崩溃,还是我在 for 循环中链接了太多函数(或循环中的函数顺序)导致了这种不稳定的行为?

如果我错过了这里的任何关键点,很乐意分享更多细节。

编辑1:For循环如下:

num_months = 5
df_final = sc.read.csv(input_path, header='true').createOrReplaceTempView("df_final")

for i in range(num_months):
    df = sc.sql("""
        SELECT *
        FROM df_final mrd
        INNER JOIN
            (SELECT ColA AS ColA_tmp, MAX(fh_effdt) AS max_fh_effdt
            FROM df_final
            GROUP BY ColA) grouped_mrd
        ON mrd.ColA = grouped_mrd.ColA_tmp
        AND mrd.fh_effdt = grouped_mrd.max_fh_effdt
        """)
    df = df.drop(df.ColA_tmp).drop(df.max_fh_effdt).drop(df.ColB_lag2)
    df_tmp = df.withColumn("ColNum", (df.wala + 1).cast(IntegerType())) \
               .withColumn("ColB_lag2", df.ColB_lag1) \
               .withColumn("ColB_lag1", df.ColB) \
               .withColumn("ColB", someFunc())
    df_final = df_final.union(df_tmp)

df_final.persist()
df_final.coalesce(1).write.csv(output_path + scenario_name+"_df_final", mode='overwrite', header='true')

解决方案:问题出在工会上。由于我删除了列并重新计算它们,spark 将这些列添加到末尾,并且“联合”按列位置而不是名称进行联合。这就是在后续循环中造成问题的原因,因为新行的数据移动了几列。解决方案是在进行联合之前从字面上选择所有列并重新排序它们。上面的 sn-p 被简化了,我可以在不删除 ColB_lag2 的情况下做到这一点。实际代码之间还有另一个步骤,我从另一个数据帧连接刷新一些值,并且在从新数据帧引入之前需要删除这些列。

【问题讨论】:

  • 您能添加您的for loop 代码吗?另外,这篇文章中没有添加行的代码。
  • 添加了 for 循环代码。
  • @ApoorvAgarwal 能否请您添加最终代码,以便对社区更有用?

标签: python for-loop pyspark


【解决方案1】:

您的问题是您正在对数据框版本(来自 csv 数据源的原始数据)创建临时视图,并期望它反映对 df_final 数据框变量所做的更改。

临时视图df_final 不包含循环运行时对数据框df_final 生成的数据。数据帧是不可变的。解决此问题的一种方法是也替换循环中的临时视图:

# the top part of your loop...
df_final = df_final.union(df_tmp)
df_final.createOrReplaceTempView("df_final")

【讨论】:

  • 对。这是我在循环中添加的内容。不过,问题出在工会上。上面编辑中的解释
猜你喜欢
  • 1970-01-01
  • 2019-03-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-10-22
  • 2019-03-15
  • 2011-10-02
相关资源
最近更新 更多