【问题标题】:Calculate values from two dataframes in PySpark在 PySpark 中计算两个数据帧的值
【发布时间】:2019-05-29 22:15:57
【问题描述】:

我正在尝试对 PySpark (2.4) Dataframe 进行分组和求和,但不能只一个一个地获取值。

我有以下数据框:

data.groupBy("card_scheme", "failed").count().show()

+----------------+------+------+
|     card_Scheme|failed| count|
+----------------+------+------+
|             jcb| false|     4|
|american express| false| 22084|
|            AMEX| false|     4|
|      mastercard|  true|  1122|
|            visa|  true|  1975|
|            visa| false|126372|
|              CB| false|     6|
|        discover| false|  2219|
|         maestro| false|     2|
|            VISA| false|    13|
|      mastercard| false| 40856|
|      MASTERCARD| false|     9|
+----------------+------+------+

我正在尝试为每个 card_scheme 计算公式 X = false / (false + true),最后仍然得到一个数据帧。

我期待这样的事情:

| card_scheme | X |
|-------------|---|
| jcb         | 1 |
| ....        | . |
| visa        | 0.9846| (which is 126372 / (126372 + 1975)        
| ...         | . |

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    创建数据集

    myValues = [('jcb',False,4),('american express', False, 22084),('AMEX',False,4),('mastercard',True,1122),('visa',True,1975),('visa',False,126372),('CB',False,6),('discover',False,2219),('maestro',False,2),('VISA',False,13),('mastercard',False,40856),('MASTERCARD',False,9)]
    df = sqlContext.createDataFrame(myValues,['card_Scheme','failed','count'])
    df.show()
    +----------------+------+------+
    |     card_Scheme|failed| count|
    +----------------+------+------+
    |             jcb| false|     4|
    |american express| false| 22084|
    |            AMEX| false|     4|
    |      mastercard|  true|  1122|
    |            visa|  true|  1975|
    |            visa| false|126372|
    |              CB| false|     6|
    |        discover| false|  2219|
    |         maestro| false|     2|
    |            VISA| false|    13|
    |      mastercard| false| 40856|
    |      MASTERCARD| false|     9|
    +----------------+------+------+
    

    方法一:这个方法会比较慢,因为它涉及到pivot的转置。

    df=df.groupBy("card_Scheme").pivot("failed").sum("count")
    df=df.withColumn('X',when((col('True').isNotNull()),(col('false')/(col('false')+col('true')))).otherwise(1))
    df=df.select('card_Scheme','X')
    df.show()
    +----------------+------------------+
    |     card_Scheme|                 X|
    +----------------+------------------+
    |            VISA|               1.0|
    |             jcb|               1.0|
    |      MASTERCARD|               1.0|
    |         maestro|               1.0|
    |            AMEX|               1.0|
    |      mastercard|0.9732717137548239|
    |american express|               1.0|
    |              CB|               1.0|
    |        discover|               1.0|
    |            visa|0.9846120283294506|
    +----------------+------------------+
    

    方法 2: 使用 SQL - 您可以通过 windows 函数执行此操作。这样会快很多。

    from pyspark.sql.window import Window
    df = df.groupBy("card_scheme", "failed").agg(sum("count"))\
      .withColumn("X", col("sum(count)")/sum("sum(count)").over(Window.partitionBy(col('card_scheme'))))\
      .where(col('failed')== False).drop('failed','sum(count)')
    df.show()
    
    +----------------+------------------+
    |     card_scheme|                 X|
    +----------------+------------------+
    |            VISA|               1.0|
    |             jcb|               1.0|
    |      MASTERCARD|               1.0|
    |         maestro|               1.0|
    |            AMEX|               1.0|
    |      mastercard|0.9732717137548239|
    |american express|               1.0|
    |              CB|               1.0|
    |        discover|               1.0|
    |            visa|0.9846120283294506|
    +----------------+------------------+
    

    【讨论】:

    • 真的很有帮助!非常感谢:)
    【解决方案2】:

    首先将根数据帧拆分为两个数据帧:

    df_true = data.filter(data.failed == True).alias("df1")
    df_false =data.filter(data.failed == False).alias("df2")
    

    然后进行全外连接,我们可以得到最终结果:

    from pyspark.sql.functions import col,when
    df_result = df_true.join(df_false,df_true.card_scheme == df_false.card_scheme, "outer") \
        .select(when(col("df1.card_scheme").isNotNull(), col("df1.card_scheme")).otherwise(col("df2.card_scheme")).alias("card_scheme") \
                , when(col("df1.failed").isNotNull(), (col("df2.count")/(col("df1.count") + col("df2.count")))).otherwise(1).alias("X"))
    

    无需groupby,只需额外添加两个数据框并加入即可。

    【讨论】:

    • 您的代码将完美运行并解决手头的问题。我只想说一点——如果数据存储在多个分区上,joins 是相当昂贵的操作,因为它会涉及洗牌。 groupBy 可以先对本地分区进行分组,然后再进行随机播放。因此,洗牌的行数会少很多,从而提高工作效率。问候,
    • 是的。我同意:)
    【解决方案3】:

    data.groupBy("card_scheme").pivot("failed").agg(count("card_scheme")) 应该可以工作。我不确定agg(count(any_column)),但线索是pivot 函数。结果,您将获得两个新列:falsetrue。然后你可以很容易地计算出x = false / (false + true)

    【讨论】:

    • 嗯,这与我想要的非常接近,但这有时会给我“空”值
    【解决方案4】:

    一个简单的解决方案是做第二个 groupby:

    val grouped_df = data.groupBy("card_scheme", "failed").count() // your dataframe
    val with_countFalse = grouped_df.withColumn("countfalse", when($"failed" === "false", $"count").otherwise(lit(0)))
    with_countFalse.groupBy("card_scheme").agg(when($"failed" === "false", $"count").otherwise(lit(0)))) / sum($"count")).show()
    

    这个想法是,您可以创建第二列,在 failed=false 中包含失败,否则为 0。这意味着 count 列的总和给出了 false + true,而 countfalse 的总和给出了 false。然后简单地做第二个 groupby

    注意:其他一些答案使用枢轴。我相信枢轴解决方案会更慢(它做得更多),但是,如果您确实选择使用它,请将特定值添加到枢轴调用中,即 pivot("failed", ["true", "false"])为了提高性能,否则 spark 将不得不执行两条路径(首先找到值)

    【讨论】:

      【解决方案5】:
      from pyspark.sql import functions as func
      from pyspark.sql.functions import col    
      data = data.groupby("card_scheme", "failed").count()
      

      创建 2 个新数据框:

      a = data.filter(col("failed") == "false").groupby("card_scheme").agg(func.sum("count").alias("num"))
      b = data.groupby("card_scheme").agg(func.sum("count").alias("den"))
      

      加入两个数据框:

      c = a.join(b, a.card_scheme == b.card_scheme).drop(b.card_scheme)
      

      将一列与另一列分开:

      c.withColumn('X', c.num/c.den)
      

      【讨论】:

      • 完美运行!不需要改变任何东西:)
      猜你喜欢
      • 1970-01-01
      • 2017-04-11
      • 1970-01-01
      • 1970-01-01
      • 2022-01-06
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-27
      相关资源
      最近更新 更多