【问题标题】:How to create multiple DataFrames from a multiple lists in Scala Spark如何从 Scala Spark 中的多个列表创建多个 DataFrame
【发布时间】:2020-09-22 14:56:25
【问题描述】:

我正在尝试从下面的两个列表中创建多个 DataFrame,

val paths = ListBuffer("s3://abc_xyz_tableA.json",
                       "s3://def_xyz_tableA.json",
                       "s3://abc_xyz_tableB.json",
                       "s3://def_xyz_tableB.json",
                       "s3://abc_xyz_tableC.json",....)

val tableNames = ListBuffer("tableA","tableB","tableC","tableD",....)

我想使用表名创建不同的数据框,方法是将所有以 s3 路径结尾的公共表名放在一起,因为它们具有唯一的架构。

so for example if the tables and paths related to it are brought together then -

 "tableADF" will have all the data from these paths "s3://abc_xyz_tableA.json", "s3://def_xyz_tableA.json" as they have "tableA" in the path

 "tableBDF" will have all the data from these paths "s3://abc_xyz_tableB.json", "s3://def_xyz_tableB.json" as they have "tableB" in the path

and so on there can be many tableNames and Paths

我正在尝试不同的方法,但还没有成功。 实现所需解决方案的任何线索都将大有帮助。谢谢!

【问题讨论】:

  • 我已添加解决方案并检查一次。

标签: json scala apache-spark amazon-s3


【解决方案1】:

使用input_file_name() udf,您可以根据文件名进行过滤以获取每个文件/文件模式的数据框

import org.apache.spark.sql.functions._
import spark.implicits._
var df = spark.read.format("json").load("s3://data/*.json")
df = df.withColumn(
  "input_file", input_file_name()
)

val tableADF= df.filter($"input_file".endsWith("tableA.json"))
val tableBDF= df.filter($"input_file".endsWith("tableB.json"))

【讨论】:

  • 嘿,感谢您的回复,所以问题是如何通过加载以相同名称结尾的路径来创建数据框。我需要实现这个 val tableADF = spark.read.json("All_the_paths_with_ending_tableA") 。我无法将所有文件路径一起加载,因为它们都有不同的架构。只有以相同名称结尾的路径才会具有相同的架构。
  • 你可以使用这个 spark.read.format("json").load("s3://data/*tableA.json") 加载,使用通配符表达式如 *tableA.json
  • 如果它们只是几个文件,我可以手动完成,但可能有数千个文件,我试图让它动态加载,而不是手动为每个表名创建数据框跨度>
【解决方案2】:

如果文件后修复名称列表很长,那么您可以使用以下内容, 还可以找到内联的代码说明

import org.apache.spark.sql.functions._


object DFByFileName {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Load your JSON data
    var df = spark.read.format("json").load("s3://data/*.json")

    //Add a column with file name
    df = df.withColumn(
      "input_file", (input_file_name())
    )

    //Extract unique file postfix from the file names in a List
    val fileGroupList = df.select("input_file").map(row => {
      val fileName = row.getString(0)
      val index1 = fileName.lastIndexOf("_")
      val index2 = fileName.lastIndexOf(".")
      fileName.substring(index1 + 1, index2)
    }).collect()

    //Iterate file group name to map of (fileGroup -> Dataframe of file group) 
    fileGroupList.map(fileGroupName => {
      df.filter($"input_file".endsWith(s"${fileGroupName}.json"))
      //perform dataframe operations
    })
  }

}


【讨论】:

  • 嘿,感谢您的回复,所以问题是如何通过加载以相同名称结尾的路径来创建数据框。我需要实现这个 val tableADF = spark.read.json("All_the_paths_with_ending_tableA") 。我无法将所有文件路径一起加载,因为它们都有不同的架构。只有以相同名称结尾的路径才会具有相同的架构。
【解决方案3】:

检查下面的代码 & 最终结果类型是

scala.collection.immutable.Map[String,org.apache.spark.sql.DataFrame] = Map(tableBDF -> [...], tableADF -> [...], tableCDF -> [...]) 其中... 是您的列列表。

paths
.map(path => (s"${path.split("_").last.split("\\.json").head}DF",path)) // parsing file names and extracting table name and path into tuple
.groupBy(_._1) // grouping paths based same table name
.map(p => (p._1 -> p._2.map(_._2))).par // combining paths for same table into list and also .par function to execute subsequent steps  in Parallel
.map(mp => { 
      (
         mp._1, // table name
         mp._2.par // For same DF multiple Files load parallel.
                   .map(spark.read.json(_)) // loading files s3
                   .reduce(_ union _) // union if same table has multiple files.
      )
   }
) 

【讨论】:

  • 感谢您的回答。您能否在代码中添加一些解释。代码中的多个映射有点难以理解
  • 我已经添加了cmets inline,请检查
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-08-04
  • 2016-11-21
  • 2021-11-20
  • 2020-03-18
  • 2017-02-17
  • 1970-01-01
  • 2016-07-08
相关资源
最近更新 更多