【发布时间】:2020-04-18 05:52:17
【问题描述】:
我有一个数据框,每个日期都是最新的。 每天我都需要将新的 qte 和新的 ca 添加到旧的并更新日期。 所以我需要更新已经存在的并添加新的。
这是我最后想要的一个例子:
val histocaisse = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")
val hist = histocaisse
.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("article_id", 'pos_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(DoubleType))
.withColumn("ca", 'ca.cast(DoubleType))
val histocaisse2 = spark.read
.format("csv")
.option("header", "true") //reading the headers
.load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")
val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType))
.withColumn("article_id", 'pos_id.cast(LongType))
.withColumn("date", 'date.cast(DateType))
.withColumn("qte", 'qte.cast(DoubleType))
.withColumn("ca", 'ca.cast(DoubleType))
hist2.show(false)
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-07|2.5 |3.5 |
|2 |2 |2000-01-07|14.7|12.0|
|3 |3 |2000-01-07|3.5 |1.2 |
+------+----------+----------+----+----+
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-08|2.5 |3.5 |
|2 |2 |2000-01-08|14.7|12.0|
|3 |3 |2000-01-08|3.5 |1.2 |
|4 |4 |2000-01-08|3.5 |1.2 |
|5 |5 |2000-01-08|14.5|1.2 |
|6 |6 |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+
+------+----------+----------+----+----+
|pos_id|article_id|date |qte |ca |
+------+----------+----------+----+----+
|1 |1 |2000-01-08|5.0 |7.0 |
|2 |2 |2000-01-08|39.4|24.0|
|3 |3 |2000-01-08|7.0 |2.4 |
|4 |4 |2000-01-08|3.5 |1.2 |
|5 |5 |2000-01-08|14.5|1.2 |
|6 |6 |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+
为此,我这样做了
val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left")
.select($"pos_id", $"article_id",
coalesce(hist2("date"), hist1("date")).alias("date"),
(coalesce(hist2("qte"), lit(0)) + coalesce(hist1("qte"), lit(0))).alias("qte"),
(coalesce(hist2("ca"), lit(0)) + coalesce(hist1("ca"), lit(0))).alias("ca"))
.orderBy("pos_id", "article_id")
// df.show()
|pos_id|article_id| date| qte| ca|
+------+----------+----------+----+----+
| 1| 1|2000-01-08| 5.0| 7.0|
| 2| 2|2000-01-08|29.4|24.0|
| 3| 3|2000-01-08| 7.0| 2.4|
| 4| 4|2000-01-08| 3.5| 1.2|
| 5| 5|2000-01-08|14.5| 1.2|
| 6| 6|2000-01-08| 2.0|1.25|
+------+----------+----------+----+----+
目标是更新现有信息并添加新信息。但是当我尝试案例历史为空时,我遇到了以下问题
Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1321)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
即使第一个表是空的,我应该怎么做才能考虑到
【问题讨论】:
-
histocaisse_dte1.csv有标题行吗?
标签: scala apache-spark apache-spark-sql