【问题标题】:Spark dataframe duplicate row based on splitting column value in scala基于scala中拆分列值的Spark数据帧重复行
【发布时间】:2020-05-01 14:15:59
【问题描述】:

我在 scala 中有以下代码:

val  fullCertificateSourceDf = certificateSourceDf         
              .withColumn("Stage", when(col("Data.WorkBreakdownUp1Summary").isNotNull && col("Data.WorkBreakdownUp1Summary")=!="",                                                     rtrim(regexp_extract($"Data.WorkBreakdownUp1Summary","^.*?(?= - *[a-zA-Z])",0))).otherwise(""))
              .withColumn("SubSystem", when(col("Data.ProcessBreakdownSummaryList").isNotNull && col("Data.ProcessBreakdownSummaryList")=!="",                                         regexp_extract($"Data.ProcessBreakdownSummaryList","^.*?(?= - *[a-zA-Z])",0)).otherwise(""))
              .withColumn("System", when(col("Data.ProcessBreakdownUp1SummaryList").isNotNull && col("Data.ProcessBreakdownUp1SummaryList")=!="",                                         regexp_extract($"Data.ProcessBreakdownUp1SummaryList","^.*?(?= - *[a-zA-Z])",0)).otherwise(""))
              .withColumn("Facility", when(col("Data.ProcessBreakdownUp2Summary").isNotNull && col("Data.ProcessBreakdownUp2Summary")=!="",                                         regexp_extract($"Data.ProcessBreakdownUp2Summary","^.*?(?= - *[a-zA-Z])",0)).otherwise(""))
              .withColumn("Area", when(col("Data.ProcessBreakdownUp3Summary").isNotNull && col("Data.ProcessBreakdownUp3Summary")=!="",                                         regexp_extract($"Data.ProcessBreakdownUp3Summary","^.*?(?= - *[a-zA-Z])",0)).otherwise(""))
              .select("Data.ID",
                      "Data.CertificateID",
                      "Data.CertificateTag",
                      "Data.CertificateDescription",
                      "Data.WorkBreakdownUp1Summary",
                      "Data.ProcessBreakdownSummaryList",
                      "Data.ProcessBreakdownUp1SummaryList",
                      "Data.ProcessBreakdownUp2Summary",
                      "Data.ProcessBreakdownUp3Summary",
                      "Data.ActualStartDate",
                      "Data.ActualEndDate",
                      "Data.ApprovedDate",
                      "Data.CurrentState",
                      "DataType",
                      "PullDate",
                      "PullTime",
                      "Stage",
                      "System",
                      "SubSystem",
                      "Facility",
                      "Area"
                     )
                     .filter((col("Stage").isNotNull) && (length(col("Stage"))>0))
                     .filter(((col("SubSystem").isNotNull) && (length(col("SubSystem"))>0)) || ((col("System").isNotNull) && (length(col("System"))>0)) || ((col("Facility").isNotNull) && (length(col("Facility"))>0)) || ((col("Area").isNotNull) && (length(col("Area"))>0))
                      )
                     .select("*")

此数据框 fullCertificateSourceDf 包含以下数据:

我为简洁隐藏了一些列。

我希望数据如下所示:

我们分为两列:ProcessBreakdownSummaryList 和 ProcessBreakdownUp1SummaryList。它们都是逗号分隔的列表。

请注意 ProcessBreakdownSummaryList (CS10-100-22-10 - Mine Intake Air Fan Heater System, CS10-100-81-10 - Mine Services Switchgear) 和 ProcessBreakdownUp1SummaryList (CS10- 100-22 - 工作轴通风,CS10-100-81 - 工作轴电气)是相同的,我们应该只拆分一次。

但是,如果它们与 ProcessBreakdownSummaryList(CS10-100-22-10 - Mine Intake Air Fan Heater System, CS10-100-81-10 - Mine Services Switchgear) 和 ProcessBreakdownUp1SummaryList ( CS10-100-22 - 工作轴通风,CS10-100-34 - 工作轴电气)它应该再次分开第三排。

提前感谢您对此提供的帮助。

【问题讨论】:

    标签: scala apache-spark apache-spark-sql databricks


    【解决方案1】:

    您可以通过多种方式解决它,我认为复杂处理最简单的方法是使用scala。您可以读取包括“ProcessBreakdownSummaryList”和“ProcessBreakdownUp1SummaryList”在内的所有列,比较它们的值是否相同/不同,并为单个输入行发出多行。然后在输出上进行平面映射以获取包含您需要的所有行的数据框。

    val fullCertificateSourceDf = // your code
    
    fullCertificateSourceDf.map{ row =>
    val id = row.getAs[String]("Data.ID")
    ... read all columns
    
    val processBreakdownSummaryList = row.getAs[String]("Data.ProcessBreakdownSummaryList")
    val processBreakdownUp1SummaryList = row.getAs[String]("Data.ProcessBreakdownUp1SummaryList")
    
    //split processBreakdownSummaryList on ","
    //split processBreakdownUp1SummaryList on ","
    //compare then for equality 
    //lets say you end up with 4 rows.
    
    //return Seq of those 4 rows in a list processBreakdownSummary
    //return a List of tuple of strings like List((id, certificateId, certificateTag, ..distinct values of processBreakdownUp1SummaryList...), (...) ...)
    //all columns id, certificateId, certificateTag etc are repeated for each distinct value of processBreakdownUp1SummaryList and processBreakdownSummaryList
    
    }.flatMap(identity(_)).toDF("column1","column2"...)
    
    

    这是一个将一行拆分为多行的示例

        val employees = spark.createDataFrame(Seq(("E1",100.0,"a,b"), ("E2",200.0,"e,f"),("E3",300.0,"c,d"))).toDF("employee","salary","clubs")
    
        employees.map{ r =>
          val clubs = r.getAs[String]("clubs").split(",")
          for{
            c : String <- clubs
          }yield(r.getAs[String]("employee"),r.getAs[Double]("salary"), c)
        }.flatMap(identity(_)).toDF("employee","salary","clubs").show(false)
    

    结果看起来像

    +--------+------+-----+
    |employee|salary|clubs|
    +--------+------+-----+
    |E1      |100.0 |a    |
    |E1      |100.0 |b    |
    |E2      |200.0 |e    |
    |E2      |200.0 |f    |
    |E3      |300.0 |c    |
    |E3      |300.0 |d    |
    +--------+------+-----+
    

    【讨论】:

    • 谢谢萨利姆。一个例子将不胜感激。
    • 请查看提供的示例。
    • 太棒了@Salim。你将如何处理这个(null)` val employees = spark.createDataFrame(Seq(("E1",100.0,"a,b"), ("E2",200.0,"e,f"),("E3 ",300.0,"c,d"),("E4",300.0, null))).toDF("employee","salary","clubs")`
    • 请避免为数字列使用 null,scala 长时间无法识别 null。而是对数值使用选项或默认值。对于字符串,您可以使用 null 或默认值。如果您提出其他问题或对此问题投赞成票,我可以为您举一个例子。
    猜你喜欢
    • 1970-01-01
    • 2021-03-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-11-27
    相关资源
    最近更新 更多