【发布时间】:2018-04-06 18:10:27
【问题描述】:
大家好,我有一个需要根据另一个数据帧更新的数据帧,我们要对某些字段求和,其他字段仅取第二个数据帧提供的新值,这就是我所做的
val hist1 = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")
.withColumn("article_id", 'article_id.cast(LongType))
.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("qte", 'qte.cast(LongType))
.withColumn("ca", 'ca.cast(DoubleType))
hist1.show
val hist2 = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/his2.csv")
.withColumn("article_id", 'article_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(LongType))
.withColumn("ca", 'ca.cast(DoubleType))
hist2.show
val df3 = hist1.unionAll(hist2)
//
val df4 = df3.groupBy("pos_id", "article_id").agg($"pos_id", $"article_id", max("date"), sum("qte"), sum("ca"))
df4.show
+------+----------+----------+---+----+----------+
|pos_id|article_id| date|qte| ca|sale_price|
+------+----------+----------+---+----+----------+
| 1| 1|2000-01-07| 3| 3.5| 14.3|
| 2| 2|2000-01-07| 15|12.0| 13.2|
| 3| 2|2000-01-07| 4| 1.2| 14.3|
| 4| 2|2000-01-07| 4| 1.2| 12.3|
+------+----------+----------+---+----+----------+
+------+----------+----------+---+----+----------+
|pos_id|article_id| date|qte| ca|sale_price|
+------+----------+----------+---+----+----------+
| 1| 1|2000-01-08| 3| 3.5| 14.5|
| 2| 2|2000-01-08| 15|12.0| 20.2|
| 3| 2|2000-01-08| 4| 1.2| 17.5|
| 4| 2|2000-01-08| 4| 1.2| 18.2|
| 5| 3|2000-01-08| 15| 1.2| 11.2|
| 6| 1|2000-01-08| 2|1.25| 13.5|
| 6| 2|2000-01-08| 2|1.25| 14.3|
+------+----------+----------+---+----+----------+
+------+----------+----------+--------+-------+
|pos_id|article_id| max(date)|sum(qte)|sum(ca)|
+------+----------+----------+--------+-------+
| 2| 2|2000-01-08| 30| 24.0|
| 3| 2|2000-01-08| 8| 2.4|
| 1| 1|2000-01-08| 6| 7.0|
| 5| 3|2000-01-08| 15| 1.2|
| 6| 1|2000-01-08| 2| 1.25|
| 6| 2|2000-01-08| 2| 1.25|
| 4| 2|2000-01-08| 8| 2.4|
+------+----------+----------+--------+-------+
如果我想附加字段 sale_price 并考虑第二个数据框提供的新 sale_price,请求将如何 这个请求会怎样
val df4 = df3.groupBy("pos_id", "article_id").agg($"pos_id", $"article_id", max("date"), sum("qte"), sum("ca"))
在此先感谢
【问题讨论】:
标签: scala apache-spark