【问题标题】:update dataframe based on union function spark基于联合函数 spark 更新数据帧
【发布时间】:2018-04-06 18:10:27
【问题描述】:

大家好,我有一个需要根据另一个数据帧更新的数据帧,我们要对某些字段求和,其他字段仅取第二个数据帧提供的新值,这就是我所做的

  val hist1 = spark.read
      .format("csv")
      .option("header", "true") //reading the headers
      .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")
      .withColumn("article_id", 'article_id.cast(LongType))
      .withColumn("pos_id", 'pos_id.cast(LongType))
      .withColumn("qte", 'qte.cast(LongType))
      .withColumn("ca", 'ca.cast(DoubleType))

    hist1.show

    val hist2 = spark.read
      .format("csv")
      .option("header", "true") //reading the headers
      .load("C:/Users/MHT/Desktop/his2.csv")
      .withColumn("article_id", 'article_id.cast(LongType))
      .withColumn("date", 'date.cast(DateType))
      .withColumn("qte", 'qte.cast(LongType))
      .withColumn("ca", 'ca.cast(DoubleType))

    hist2.show

    val df3 = hist1.unionAll(hist2)
    //      
    val df4 = df3.groupBy("pos_id", "article_id").agg($"pos_id", $"article_id", max("date"), sum("qte"), sum("ca"))
    df4.show

+------+----------+----------+---+----+----------+
|pos_id|article_id|      date|qte|  ca|sale_price|
+------+----------+----------+---+----+----------+
|     1|         1|2000-01-07|  3| 3.5|      14.3|
|     2|         2|2000-01-07| 15|12.0|      13.2|
|     3|         2|2000-01-07|  4| 1.2|      14.3|
|     4|         2|2000-01-07|  4| 1.2|      12.3|
+------+----------+----------+---+----+----------+

+------+----------+----------+---+----+----------+
|pos_id|article_id|      date|qte|  ca|sale_price|
+------+----------+----------+---+----+----------+
|     1|         1|2000-01-08|  3| 3.5|      14.5|
|     2|         2|2000-01-08| 15|12.0|      20.2|
|     3|         2|2000-01-08|  4| 1.2|      17.5|
|     4|         2|2000-01-08|  4| 1.2|      18.2|
|     5|         3|2000-01-08| 15| 1.2|      11.2|
|     6|         1|2000-01-08|  2|1.25|      13.5|
|     6|         2|2000-01-08|  2|1.25|      14.3|
+------+----------+----------+---+----+----------+



    +------+----------+----------+--------+-------+
|pos_id|article_id| max(date)|sum(qte)|sum(ca)|
+------+----------+----------+--------+-------+
|     2|         2|2000-01-08|      30|   24.0|
|     3|         2|2000-01-08|       8|    2.4|
|     1|         1|2000-01-08|       6|    7.0|
|     5|         3|2000-01-08|      15|    1.2|
|     6|         1|2000-01-08|       2|   1.25|
|     6|         2|2000-01-08|       2|   1.25|
|     4|         2|2000-01-08|       8|    2.4|
+------+----------+----------+--------+-------+

如果我想附加字段 sale_price 并考虑第二个数据框提供的新 sale_price,请求将如何 这个请求会怎样

 val df4 = df3.groupBy("pos_id", "article_id").agg($"pos_id", $"article_id", max("date"), sum("qte"), sum("ca"))

在此先感谢

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    您可以在最后一行使用join,如下所示

    val df4 = df3.groupBy("pos_id", "article_id").agg(max("date"), sum("qte"), sum("ca")).join(hist2.select("pos_id", "article_id", "sale_price"), Seq("pos_id", "article_id"))
    

    你应该有你想要的输出

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-05-12
      • 2020-10-08
      • 1970-01-01
      • 2016-02-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多