【问题标题】:How do I create a new DataFame based on an old DataFrame?如何基于旧的 DataFrame 创建新的 DataFrame?
【发布时间】:2022-01-09 10:17:33
【问题描述】:

我有 csv 文件:dbname1.table1.csv

|target            | source        |source_table                       |relation_type|
 ---------------------------------------------------------------------------------------
avg_ensure_sum_12m | inn_num       | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | protocol_dttm | custom_cib_ml_stg.p_overall_part_tend_cust | direct
avg_ensure_sum_12m | inn_num       | custom_cib_ml_stg.p_overall_part_tend_cust | indirect

此表的 csv 格式:

target,source,source_table,relation_type
avg_ensure_sum_12m,inn_num,custom_cib_ml_stg.p_overall_part_tend_cust,direct
avg_ensure_sum_12m,protocol_dttm,custom_cib_ml_stg.p_overall_part_tend_cust,direct
avg_ensure_sum_12m,inn_num,custom_cib_ml_stg.p_overall_part_tend_cust,indirect

然后我通过读取它来创建一个数据框:

 val dfDL = spark.read.option("delimiter", ",")
                     .option("header", true)
                     .csv(file.getPath.toUri.getPath)

现在我需要基于 dfDL 创建一个新的数据框。

新数据框的结构如下所示:

case class DataLink(schema_from: String,
                    table_from: String,
                    column_from: String,
                    link_type: String,
                    schema_to: String,
                    table_to: String,
                    column_to: String)

新DataFrame的字段信息是从csv文件中获取的:

pseudocode:
schema_from = source_table.split(".")(0) // Example: custom_cib_ml_stg
table_from  = source_table.split(".")(1) // Example: p_overall_part_tend_cust
column_from = source                     // Example: inn_num
link_type   = relation_type              // Example: direct
schema_to   = "dbname1.table1.csv".split(".")(0) // Example: dbname1
table_to    = "dbname1.table1.csv".split(".")(1) // Example: table1
column_to   = target                             // Example: avg_ensure_sum_12m

我需要创建一个新的数据框。我一个人应付不来。

附:我需要这个数据框稍后从中创建一个 json 文件。 示例 JSON:

[{"schema_from":"custom_cib_ml36_stg",
"table_from":"p_overall_part_tend_cust",
"column_from":"inn_num",
"link_type":"direct",
"schema_to":"dbname1",
"table_to":"table1",
"column_to":"avg_ensure_sum_12m"
},
{"schema_from":"custom_cib_ml36_stg",
"table_from":"p_overall_part_tend_cust",
"column_from":"protocol_dttm",
"link_type":"direct","schema_to":"dbname1",
"table_to":"table1",
"column_to":"avg_ensure_sum_12m"}

我不喜欢我当前的实现:

def readDLFromHDFS(file: LocatedFileStatus): Array[DataLink] = {

    val arrTableName        = file.getPath.getName.split("\\.")
    val (schemaTo, tableTo) = (arrTableName(0), arrTableName(1))

    val dfDL = spark.read.option("delimiter", ",")
                         .option("header", true)
                         .csv(file.getPath.toUri.getPath)

    //val sourceTable = dfDL.select("source_table").collect().map(value => value.toString().split("."))

    dfDL.collect.map(row => DataLink(row.getString(2).split("\\.")(0),
                                     row.getString(2).split("\\.")(1),
                                     row.getString(1),
                                     row.getString(3),
                                     schemaTo,
                                     tableTo,
                                     row.getString(0)))
  }

  def toJSON(dataLinks: Array[DataLink]): Option[JValue] =
    dataLinks.map(Extraction.decompose).reduceOption(_ ++ _)

}

【问题讨论】:

    标签: dataframe scala apache-spark apache-spark-sql


    【解决方案1】:

    你肯定不想收集,这与在这里使用 spark 的意义相悖。与 Spark 一样,您有很多选择。您可以使用 RDD,但我认为不需要在此处切换模式。您只想将自定义逻辑应用于某些列,并最终得到一个仅包含结果列的数据框。

    首先,定义一个您要应用的UDF

    def convert(target, source, source_table, relation_type) =
      DataLink(source_table.split("\\.")(0),
               source_table.split("\\.")(1),
               source,
               "dbname1.table1.csv".split(".")(0)
               "dbname1.table1.csv".split(".")(1)
               target))
    

    然后将此函数应用于所有相关列(确保将其包装在 udf 中以使其成为 spark 函数而不是普通的 Scala 函数)和 select 结果:

    df.select(udf(convert)($"target", $"source", $"source_table", $"relation_type"))
    

    如果你想要一个有 7 列的DataFrame 作为结果:

    df.select(
      split(col("source_table"), "\\.").getItem(0),
      split(col("source_table"), "\\.").getItem(1),
      col("source"),
      lit("dbname1"),
      lit("table1"),
      col("target")
    )
    

    您还可以将.as("column_name") 添加到这 7 列中的每一列。

    【讨论】:

    • 嗨!这是很酷的解决方案。很容易阅读。但我有一个小问题。我究竟做错了什么? pic4a.ru/112/JYd.png
    • 到底是什么问题?
    • sorry =) "Value '$' is not a member of StringContext" 这个问题pic4a.ru/112/kL2.png
    • 不知道为什么你会得到这个,但你也可以使用 col("target") 代替
    • 尝试将udf(convert) 存储在单独的变量中?然后申请f(col("...
    【解决方案2】:

    您可以直接使用数据集。

    import spark.implicits._
    
    case class DataLink(schema_from: String,
                        table_from: String,
                        column_from: String,
                        link_type: String,
                        schema_to: String,
                        table_to: String,
                        column_to: String)
    
    val filename = "dbname1.table1.csv"
    val df = spark.read.option("header","true").csv("test.csv")
    df.show(false)
    +------------------+-------------+------------------------------------------+-------------+
    |target            |source       |source_table                              |relation_type|
    +------------------+-------------+------------------------------------------+-------------+
    |avg_ensure_sum_12m|inn_num      |custom_cib_ml_stg.p_overall_part_tend_cust|direct       |
    |avg_ensure_sum_12m|protocol_dttm|custom_cib_ml_stg.p_overall_part_tend_cust|direct       |
    |avg_ensure_sum_12m|inn_num      |custom_cib_ml_stg.p_overall_part_tend_cust|indirect     |
    +------------------+-------------+------------------------------------------+-------------+
    
    df.createOrReplaceTempView("table")
    
    val df2 = spark.sql(s"""
    select split(source_table, '[.]')[0] as schema_from
         , split(source_table, '[.]')[1] as table_from
         , source                        as column_from
         , relation_type                 as link_type
         , split('${filename}', '[.]')[0] as schema_to
         , split('${filename}', '[.]')[1] as table_to
         , target                        as column_to
      from table
    """).as[DataLink]
    
    df2.show()
    
    +-----------------+--------------------+-------------+---------+---------+--------+------------------+
    |      schema_from|          table_from|  column_from|link_type|schema_to|table_to|         column_to|
    +-----------------+--------------------+-------------+---------+---------+--------+------------------+
    |custom_cib_ml_stg|p_overall_part_te...|      inn_num|   direct|  dbname1|  table1|avg_ensure_sum_12m|
    |custom_cib_ml_stg|p_overall_part_te...|protocol_dttm|   direct|  dbname1|  table1|avg_ensure_sum_12m|
    |custom_cib_ml_stg|p_overall_part_te...|      inn_num| indirect|  dbname1|  table1|avg_ensure_sum_12m|
    +-----------------+--------------------+-------------+---------+---------+--------+------------------+
    

    【讨论】:

      【解决方案3】:

      我的进步... 现在,我可以创建新的 DataFrame,但他只包含 1 列。

      val dfDL = spark.read.option("delimiter", ",")
                           .option("header", true)
                           .csv(file.getPath.toUri.getPath)
      
      val convertCase = (target: String, source: String, source_table: String, relation_type: String) =>
                          DataLink(
                            source_table.split("\\.")(0),
                            source_table.split("\\.")(1),
                            source,
                            relation_type,
                            schemaTo,
                            tableTo,
                            target,
                          )
      
      
      val udfConvert = udf(convertCase)
      
      val dfForJson  = dfDL.select(udfConvert(col("target"),
                                              col("source"),
                                              col("source_table"),
                                              col("relation_type")))
      

      【讨论】:

        猜你喜欢
        • 2016-05-26
        • 1970-01-01
        • 2016-02-10
        • 2019-02-08
        • 2020-07-13
        • 2021-12-30
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多