【问题标题】:Building ETL logic in Spark Scala在 Spark Scala 中构建 ETL 逻辑
【发布时间】:2021-02-13 15:31:02
【问题描述】:

我是 Spark Scala 世界的新手。我正在尝试复制 ETL 逻辑。所以本质上,我想创建动态代码,其中我需要从存在特定列的表中提取数据,对此列进行过滤,然后将其数据存储到 Azure blob。

val url = "<Host Address>"
val user = "<Username>"
val pw = "<Password>"
val driver = "org.postgresql.Driver"
val sslfactory = "org.postgresql.ssl.NonValidatingFactory"

var sql_lookup = " select * from information_schema.tables as inf_schema left join (SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = 'Schema_name' AND columns.column_name = 'Column_A') as country on inf_schema.table_schema = country.country_table_schema and inf_schema.table_name = country.country_table_name WHERE inf_schema.table_schema='<Schemaname>'"

var dfTbl = (spark.read
      .format("jdbc")
      .option("url", url)
      .option("ssl","true")
      .option("sslfactory",sslfactory)
      .option("user", user)
      .option("password", pw)
      .option("driver", driver)
      .option("query",sql_lookup)
      .load())

var dfTbl_withCountry = (dfTbl.select(dfTbl.col("*")).filter(dfTbl.col( "country_table_column_name" ).isNotNull)).select("table_name")


val dfTbl_wc = dfTbl_withCountry.collect().foreach(row => row.toSeq.foreach(col => (col)))

for (table <- dfTbl_wc ){
  
  var sql = " select * from <Schemaname>."+s"${table}" + " where <Colume_name> = '<Value>'"
  
   var df = (spark.read
                  .format("jdbc")
                  .option("url", url)
                  .option("ssl","true")
                  .option("sslfactory",sslfactory)
                  .option("user", user)
                  .option("password", pw)
                  .option("driver", driver)
                  .option("query",sql)
                  .load())
  
   var File_withCountry = df
        .coalesce(1)
        .write
        .format("com.databricks.spark.csv")
        .option("header","true")
        .option("delimiter", "~")
        .mode(SaveMode.Overwrite)
        .option("encoding", "UTF-8")
        .csv("wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}")
  
  
  val partition_path = dbutils.fs.ls("wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}")
     .filter(file=>file.name.startsWith("part"))(0).path

dbutils.fs.cp(partition_path,"wasbs://<BlobContainer>@<StorageAccount>.blob.core.windows.net/<Targetdirectory>/"+s"${table}"+".csv")

dbutils.fs.rm (partition_path, recurse = true)
  
}

下面是查询输出内子查询

SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = '<Schema_name>' AND columns.column_name = 'Column_A' 

作为“country_table_name”列中数据框 sql_lookup 输出的每个表名,我想提取它们。 我将其输出存储在数据帧 dfTbl 中。因此,在数据帧 dfTbl_wc 中,我正在迭代数据帧 dfTbl 中的每一行。在此我使用 for 循环从 dfTbl_wc 中数据框中的每一行中选择完整数据

但由于某种原因,此代码在 for 循环部分中无法正常工作。 请帮忙!

【问题讨论】:

  • 您面临的错误或问题是什么? For 循环不能正常工作并没有给出任何上下文。
  • 空指针异常和错误:值数据框不是Unit的成员
  • @NikunjKakadiya - 有什么线索吗??
  • 在问题中提供一些测试数据,以便我可以帮助您
  • @NikunjKakadiya - 我编辑了帖子以提供更多信息。这有意义吗?

标签: postgresql scala apache-spark databricks


【解决方案1】:

可以在包含您要运行的查询的数据框中创建一个新列。然后你可以选择查询列并将其转换为数组并循环它以获得最终的数据帧,然后用它做任何你想做的事情,比如将它保存为表格、镶木地板文件、csv文件等。你想分别保存每个表格数据那么你必须在下面的 for 循环中编写代码来完成它。

//source data
val df = Seq(("Schemaname","Table1","Column_A"),("Schemaname","Table2","Column_A"),("Schemaname","Table3","Column_A"),("Schemaname","Table4","Column_A"),("Schemaname","Table5","Column_A"),("Schemaname","Table6","Column_A"))
.toDF("country_table_schema","country_table_name","country_table_column_name")
//add a column for the query that gets generated for each row
import org.apache.spark.sql.functions._

 val df1 = df.withColumn("fulltableName",concat_ws(".",$"country_table_schema",$"country_table_name"))
.withColumn("Query",concat_ws("",concat(lit("("),lit(" Select * from ")) , $"fulltableName" , lit("where column_name = "), concat($"country_table_column_name", lit(") a"))))
.drop("fulltableName")

import org.apache.spark.sql.DataFrame
//convert it to array. I am using collect here but if you have large volume don't use collect otherwise it would crash your driver.
val queryArray = df1.select("Query").rdd.collect()
val rowscount = queryArray.length
//create an array of dataframe to hold the details of each query output.
var QueryArrayDF: Array[DataFrame] = new Array[DataFrame](rowscount)
//loop through the query and create dataframe and add that to the array of dataframe
for(i <- 0 to rowscount - 1){
       val df = (spark.read
                  .format("jdbc")
                  .option("url", url)
                  .option("ssl","true")
                  .option("sslfactory",sslfactory)
                  .option("user", user)
                  .option("password", pw)
                  .option("driver", driver)
                  .option("query",queryArray(i).toString().replace("[","").replace("]",""))
                  .load())
      QueryArrayDF(i) = df
    }
// now let's combine the dataframes, if we have more than one
        var CombinedDF = QueryArrayDF(0)
        for (i <- 1 to QueryArrayDF.length - 1) {
          CombinedDF = CombinedDF.union(QueryArrayDF(i))
        }

现在您可以根据需要保存组合数据框。

【讨论】:

    【解决方案2】:

    我稍微调整了代码(基本上结合了我之前使用的代码和@NikunjKakadiya 共享的代码中的几行代码),它对我有用。分享代码供参考-

    val sql="select inf_schema.table_name, inf_schema.table_schema, country_table_column_name from information_schema.tables as inf_schema left join (SELECT table_schema as country_table_schema ,table_name as country_table_name, column_name as country_table_column_name FROM information_schema.columns WHERE table_schema = '<Schema>' AND columns.column_name = 'Column_A') as country on inf_schema.table_schema = country.country_table_schema and inf_schema.table_name = country.country_table_name WHERE inf_schema.table_schema='<Schema>'"
    
    var dfTbl = (spark.read
          .format("jdbc")
          .option("url", url)
          .option("ssl","true")
          .option("sslfactory",sslfactory)
          .option("user", user)
          .option("password", pw)
          .option("driver", driver)
          .option("query",sql)
          .load())
    
    val df = dfTbl.select(dfTbl.col("table_name")).where(dfTbl.col("country_table_column_name").isNotNull)
    
    println(df)
    
    import org.apache.spark.sql.DataFrame
    
    val df2 = (df.select("table_name").collect())
    
    val rows = df2.length
    
    for (i <- 0 to rows - 1){
          
          println(df2(i).mkString(","))     
          val sql2 = "select * from <Schema>."+df2(i).mkString(",") + " where Column_A = '<Column_Value>'"
    
      
      println(sql2)
      
      var df_f = (spark.read
                    .format("jdbc")
                    .option("url", url)
                    .option("ssl","true")
                    .option("sslfactory",sslfactory)
                    .option("user", user)
                    .option("password", pw)
                    .option("driver", driver)
                    .option("query",sql2)
                    .load())
      
       var File_withCountry = df_f
            .coalesce(1)
            .write
            .format("com.databricks.spark.csv")
            .option("header","true")
            .option("delimiter", "~")
            .mode(SaveMode.Overwrite)
            .option("encoding", "UTF-8")
            .csv("wasbs://<Container>@<StorageAccount>.blob.core.windows.net/<TargetDirectory>/"+ df2(i).mkString(",") )
      
       val partition_path = dbutils.fs.ls("wasbs://<Container>@<StorageAccount>.blob.core.windows.net/<TargetDirectory>/"+ df2(i).mkString(",")).filter(file=>file.name.startsWith("part"))(0).path
    
    dbutils.fs.cp(partition_path,"wasbs://<Container>@<StorageAccount>.blob.core.windows.net/<TargetDirectory>/"+ df2(i).mkString(",")+".csv")
    
    dbutils.fs.rm (partition_path, recurse = true)
      
      
    }
    

    如有任何疑问,请告诉我。

    感谢大家的支持。真的很欣赏它。干杯!

    【讨论】:

    • 您对我发布的代码有什么问题。尝试提及这一点,以便对其他人有所帮助。
    • @NikunjKakadiya - 不多,但我注意到的一件事是您使用了 toString 方法。这样做时,它会在每一行中拾取方括号以及内存地址。所以在一行中替换所有这些字符串对我来说有点困难。所以我选择了 mkString(",") 方法。请注意,由于我是新手,如果我错过了其他任何内容,我可能会在这里大错特错。
    猜你喜欢
    • 1970-01-01
    • 2023-04-07
    • 1970-01-01
    • 1970-01-01
    • 2022-08-19
    • 1970-01-01
    • 2017-11-22
    • 2021-10-29
    • 1970-01-01
    相关资源
    最近更新 更多