【问题标题】:Spark: update a Dataframe based on a join operationSpark:基于连接操作更新数据框
【发布时间】:2020-04-18 05:52:17
【问题描述】:

我有一个数据框,每个日期都是最新的。 每天我都需要将新的 qte 和新的 ca 添加到旧的并更新日期。 所以我需要更新已经存在的并添加新的。

这是我最后想要的一个例子:

val histocaisse = spark.read
      .format("csv")
      .option("header", "true") //reading the headers
      .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")

    val hist = histocaisse
      .withColumn("pos_id", 'pos_id.cast(LongType))
      .withColumn("article_id", 'pos_id.cast(LongType))
      .withColumn("date", 'date.cast(DateType))
      .withColumn("qte", 'qte.cast(DoubleType))
      .withColumn("ca", 'ca.cast(DoubleType))



    val histocaisse2 = spark.read
      .format("csv")
      .option("header", "true") //reading the headers

      .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")

    val hist2 = histocaisse2.withColumn("pos_id", 'pos_id.cast(LongType))
      .withColumn("article_id", 'pos_id.cast(LongType))
      .withColumn("date", 'date.cast(DateType))
      .withColumn("qte", 'qte.cast(DoubleType))
      .withColumn("ca", 'ca.cast(DoubleType))
    hist2.show(false)

+------+----------+----------+----+----+
|pos_id|article_id|date      |qte |ca  |
+------+----------+----------+----+----+
|1     |1         |2000-01-07|2.5 |3.5 |
|2     |2         |2000-01-07|14.7|12.0|
|3     |3         |2000-01-07|3.5 |1.2 |
+------+----------+----------+----+----+

+------+----------+----------+----+----+
|pos_id|article_id|date      |qte |ca  |
+------+----------+----------+----+----+
|1     |1         |2000-01-08|2.5 |3.5 |
|2     |2         |2000-01-08|14.7|12.0|
|3     |3         |2000-01-08|3.5 |1.2 |
|4     |4         |2000-01-08|3.5 |1.2 |
|5     |5         |2000-01-08|14.5|1.2 |
|6     |6         |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+

+------+----------+----------+----+----+
|pos_id|article_id|date      |qte |ca  |
+------+----------+----------+----+----+
|1     |1         |2000-01-08|5.0 |7.0 |
|2     |2         |2000-01-08|39.4|24.0|
|3     |3         |2000-01-08|7.0 |2.4 |
|4     |4         |2000-01-08|3.5 |1.2 |
|5     |5         |2000-01-08|14.5|1.2 |
|6     |6         |2000-01-08|2.0 |1.25|
+------+----------+----------+----+----+

为此,我这样做了

val df = hist2.join(hist1, Seq("article_id", "pos_id"), "left")
  .select($"pos_id", $"article_id",
    coalesce(hist2("date"), hist1("date")).alias("date"),
    (coalesce(hist2("qte"), lit(0)) + coalesce(hist1("qte"), lit(0))).alias("qte"),
    (coalesce(hist2("ca"), lit(0)) + coalesce(hist1("ca"), lit(0))).alias("ca"))
  .orderBy("pos_id", "article_id")

// df.show()
|pos_id|article_id|      date| qte|  ca|
+------+----------+----------+----+----+
|     1|         1|2000-01-08| 5.0| 7.0|
|     2|         2|2000-01-08|29.4|24.0|
|     3|         3|2000-01-08| 7.0| 2.4|
|     4|         4|2000-01-08| 3.5| 1.2|
|     5|         5|2000-01-08|14.5| 1.2|
|     6|         6|2000-01-08| 2.0|1.25|
+------+----------+----------+----+----+

目标是更新现有信息并添加新信息。但是当我尝试案例历史为空时,我遇到了以下问题

Exception in thread "main" java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1321)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)

即使第一个表是空的,我应该怎么做才能考虑到

【问题讨论】:

  • histocaisse_dte1.csv 有标题行吗?

标签: scala apache-spark apache-spark-sql


【解决方案1】:

为此,您应该定义 schema 并在读取 csv 文件时应用它。通过这样做,您甚至不需要铸造代码。 :)

在您的情况下,两个数据框看起来相同,因此您可以将架构创建为

import org.apache.spark.sql.types._
val schema = StructType(Seq(
  StructField("pos_id", LongType, true),
  StructField("article_id", LongType, true),
  StructField("date", DateType, true),
  StructField("qte", LongType, true),
  StructField("ca", DoubleType, true)
))

那么你可以使用schema作为

val hist1 = spark.read
  .format("csv")
  .option("header", "true") //reading the headers
  .schema(schema)
  .load("C:/Users/MHT/Desktop/histocaisse_dte1.csv")

val hist2 = spark.read
  .format("csv")
  .option("header", "true") //reading the headers
  .schema(schema)
  .load("C:/Users/MHT/Desktop/histocaisse_dte2.csv")

那么最后你就可以无误地应用最终逻辑了

【讨论】:

    【解决方案2】:

    Databricks Spark 运行时支持 MERGE 运算符

    它允许您根据连接条件更新目标表

    https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html

    MERGE INTO [db_name.]target_table [AS target_alias]
    USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
    ON <merge_condition>
    [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
    [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
    [ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]
    

    基本上它将与仅更新子句合并

    【讨论】:

      猜你喜欢
      • 2018-04-06
      • 1970-01-01
      • 2021-03-09
      • 1970-01-01
      • 1970-01-01
      • 2014-06-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多